2016-10-20 5 views
0

Я пытаюсь использовать Akka's ConsistentHashingRoutingLogic, чтобы гарантировать, что сообщения с одним и тем же ключом направляются к тому же Актеру. Важно, чтобы сообщения с одним и тем же ключом обрабатывались в порядке FIFO. Сообщения с разными клавишами могут быть перенаправлены в разные Актеры и обрабатываться параллельно. Я не использую Akka в распределенном режиме.Akka ConsistentHashingRoutingLogic не маршрутизируется на одну и ту же последовательность диспетчера постоянно

Сообщения на самом деле являются сообщениями JSON, считываемыми из брокера RabbitMQ, поэтому мой ведущий актер получает сообщение AMQP и использует ключ маршрутизации в качестве ключа сообщения. Тот же ключ также находится в самом сообщении. Актер является частью приложения Spring.

Мой Мастер Актер выглядит следующим образом:

@Named("MessageHandlerMaster") 
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE) 
public class MessageHandlerMaster extends UntypedActor { 

    private static final Logger log = LoggerFactory.getLogger(MessageHandlerMaster.class); 

    private Router router; 

    @Autowired 
    public MessageHandlerMaster(final SpringProps springProps) { 

    List<Routee> routees = Stream.generate(() -> { 
     ActorRef worker = getContext().actorOf(springProps.create(MessageHandlerWorker.class)); 
     getContext().watch(worker); 
     return new ActorRefRoutee(worker); 
    }).limit(5) //todo: configurable number of workers 
     .collect(Collectors.toList()); 

    router = new Router(new ConsistentHashingRoutingLogic(getContext().system()), routees); 
    } 

    public void onReceive(Object message) { 
    if (message instanceof Message) { 
     Message amqpMessage = (Message) message; 
     String encoding = getMessageEncoding(amqpMessage); 
     try { 
     String json = new String(amqpMessage.getBody(), encoding); 
     String routingKey = amqpMessage.getMessageProperties().getReceivedRoutingKey(); 
     log.debug("Routing message based on routing key " + routingKey); 
     router.route(new ConsistentHashingRouter.ConsistentHashableEnvelope(json, routingKey), getSender()); 
     } catch (UnsupportedEncodingException e) { 
     log.warn("Unknown content encoding sent in message! {}", encoding); 
     } 
    } else if (message instanceof Terminated) { 
     //if one of the routee's died, remove it and replace it 
     log.debug("Actor routee terminated!"); 
     router.removeRoutee(((Terminated) message).actor()); 
     ActorRef r = getContext().actorOf(Props.create(MessageHandlerWorker.class)); 
     getContext().watch(r); 
     router = router.addRoutee(new ActorRefRoutee(r)); 
    } 
    } 

    private static String getMessageEncoding(Message message) { 
    String encoding = message.getMessageProperties().getContentEncoding(); 
    if ((encoding == null) || (encoding.equals(""))) { 
     encoding = "UTF-8"; 
    } 
    return encoding; 
    } 
} 

я сначала получаю мастер один раз:

this.master = actorSystem.actorOf(springProps.create(MessageHandlerMaster.class), "master"); 

, а затем просто подачи сообщения ему:

master.tell(message, ActorRef.noSender()); 

Но когда я печатаю журналы из моего рабочего onReceive(), я вижу, что разные потоки диспетчера иногда используется для одного и того же ключа.

Также непонятно, почему иногда тот же самый поток диспетчера используется для актера-мастера и для действующего актера. Не должно ли это быть асинхронным сообщением, проходящим между потоками?

16:45:13.359 [aggregator-akka.actor.default-dispatcher-9] DEBUG c.u.o.a.actors.MessageHandlerMaster - Routing message based on routing key 10420186 
16:45:13.359 [aggregator-akka.actor.default-dispatcher-9] DEBUG c.u.o.a.actors.MessageHandlerWorker - Handling message for 10420186 
16:45:13.360 [aggregator-akka.actor.default-dispatcher-9] DEBUG c.u.o.a.actors.MessageHandlerMaster - Routing message based on routing key 10420186 
16:45:13.361 [aggregator-akka.actor.default-dispatcher-9] DEBUG c.u.o.a.actors.MessageHandlerMaster - Routing message based on routing key 10420186 
16:45:13.361 [aggregator-akka.actor.default-dispatcher-9] DEBUG c.u.o.a.actors.MessageHandlerMaster - Routing message based on routing key 10420186 
16:45:13.361 [aggregator-akka.actor.default-dispatcher-9] DEBUG c.u.o.a.actors.MessageHandlerMaster - Routing message based on routing key 10420186 
16:45:13.361 [aggregator-akka.actor.default-dispatcher-9] DEBUG c.u.o.a.actors.MessageHandlerMaster - Routing message based on routing key 10420186 
16:45:13.361 [aggregator-akka.actor.default-dispatcher-10] DEBUG c.u.o.a.actors.MessageHandlerWorker - Handling message for 10420186 
16:45:13.361 [aggregator-akka.actor.default-dispatcher-9] DEBUG c.u.o.a.actors.MessageHandlerMaster - Routing message based on routing key 10420186 
16:45:13.361 [aggregator-akka.actor.default-dispatcher-10] DEBUG c.u.o.a.actors.MessageHandlerWorker - Handling message for 10420186 
16:45:13.361 [aggregator-akka.actor.default-dispatcher-10] DEBUG c.u.o.a.actors.MessageHandlerWorker - Handling message for 10420186 
16:45:13.361 [aggregator-akka.actor.default-dispatcher-10] DEBUG c.u.o.a.actors.MessageHandlerWorker - Handling message for 10420186 
16:45:13.361 [aggregator-akka.actor.default-dispatcher-10] DEBUG c.u.o.a.actors.MessageHandlerWorker - Handling message for 10420186 
16:45:13.361 [aggregator-akka.actor.default-dispatcher-10] DEBUG c.u.o.a.actors.MessageHandlerWorker - Handling message for 10420186 
16:45:13.361 [aggregator-akka.actor.default-dispatcher-10] DEBUG c.u.o.a.actors.MessageHandlerWorker - Handling message for 10420186 
16:45:13.361 [aggregator-akka.actor.default-dispatcher-10] DEBUG c.u.o.a.actors.MessageHandlerWorker - Handling message for 10420186 
16:45:13.361 [aggregator-akka.actor.default-dispatcher-10] DEBUG c.u.o.a.actors.MessageHandlerMaster - Routing message based on routing key 10420186 
16:45:13.361 [aggregator-akka.actor.default-dispatcher-10] DEBUG c.u.o.a.actors.MessageHandlerWorker - Handling message for 10420186 

Как вы можете видеть здесь, диспетчеру нить для обработки сообщения рабочий с ключом 10420186 иногда 9, а иногда и 10. Мастер актер иногда также использовал эти 2 темы.

Как я могу быть уверенным, что ConsistentHashingRoutingLogic фактически работает, и один и тот же поток обрабатывает сообщения с одним и тем же ключом? Я что-то делаю неправильно в инициализации маршрутизатора?

+1

Я думаю, что актеры не связаны нитями. Поэтому в прилагаемом журнале нет ничего плохого. – vrudkovsk

ответ

0

Так что @vrudkovsk прав со своим комментарием. Я думаю, что вы путаетесь между нитями и актерами. Актеры - это просто объекты в памяти, у которых есть адрес и почтовый ящик. Диспетчеры - это, по сути, пулы потоков, которые выполняют действия с актером. Пример действие:

  • Dequeue сообщения из почтового ящика, чтобы обработать его в качестве актера
  • Enqueue сообщения в почтовый ящик.

Различные потоки могут выполнять действия для одного и того же актера. Это решено диспетчером. Akka гарантирует, что только один поток за раз обрабатывает сообщение внутри актера. Это не значит, что это всегда будет тот же самый поток.

Если вы хотите, чтобы убедиться, что они приходят к тем же актером, я рекомендовал бы войти путь актера или адрес с помощью context.self.path или context.self.path.address так как те уникальные идентификаторы в пределах одной и той же ActorSystem.

+0

Спасибо за ваш ответ.Поэтому в этом случае у меня все еще есть уверенность, что если у меня есть 5 действующих лиц, независимо от того, какой поток используется для Actor 1, поскольку я использую 'ConsistentHashingRoutingLogic', нет никакого способа, чтобы второй поток диспетчера забирал другое сообщение, которое должно быть перенаправлено в Актер 1 до обработки предыдущего сообщения? – jbx

+0

Это правильно. «ConsistentHashingRoutingLogic» гарантирует, что ваше сообщение окажется в правильном актере, независимо от того, какой поток выполняет эту работу. – hveiga