2016-06-03 12 views
0

Мой упрощенный Akka Camel приложение устанавливается следующим образом:Akka Camel: Сообщения, направляемые к неправильному маршруту SEDA

AppleProducer -> seda:appleRoute -> AppleConsumer 

OrangeProducer -> seda:orangeRoute -> OrangeConsumer 

Что я вижу, хотя в том, что Apple события периодически потребляются в OrangeConsumer, и наоборот.

Выполнение этого примера (возможно, несколько раз) ниже воссоздает его.

Я не понимаю, как это происходит только с перерывами. Что я делаю не так?

object TestApp extends App { 
    implicit val system = ActorSystem() 
    val camel = CamelExtension(system) 
    val appleProducer = system.actorOf(Props(classOf[MyProducer], "seda:appleRoute"), "AppleProducer") 
    system.actorOf(Props(classOf[MyAppleConsumer], "seda:appleRoute"), "AppleConsumer") 
    val orangeProducer = system.actorOf(Props(classOf[MyProducer], "seda:orangeRoute"), "OrangeProducer") 
    system.actorOf(Props(classOf[MyOrangeConsumer], "seda:orangeRoute"), "OrangeConsumer") 

    appleProducer ! new Apple("1") 
    orangeProducer ! new Orange("1") 
    appleProducer ! new Apple("2") 
    orangeProducer ! new Orange("2") 
    appleProducer ! new Apple("3") 
    orangeProducer ! new Orange("3") 
    appleProducer ! new Apple("4") 
    orangeProducer ! new Orange("4") 
    appleProducer ! new Apple("5") 
    orangeProducer ! new Orange("5") 
    appleProducer ! new Apple("6") 
    orangeProducer ! new Orange("6") 

} 

class MyProducer(route: String) extends Actor with ActorLogging { 

    def receive = { 
    case payload: Any => 
     val template = CamelExtension(context.system).template 
     template.setDefaultEndpointUri(route) 
     template.sendBody(payload) 
    } 
} 

class MyAppleConsumer(route: String) extends Consumer with ActorLogging { 
    override def endpointUri: String = route 

    override def receive: Receive = { 
    case event: CamelMessage if event.body.isInstanceOf[Apple] => 
     log.info("Received event {}", event.body) 
    case _ => throw new IllegalArgumentException("Invalid entity") 
    } 
} 

class MyOrangeConsumer(route: String) extends Consumer with ActorLogging { 
    override def endpointUri: String = route 

    override def receive: Receive = { 
    case event: CamelMessage if event.body.isInstanceOf[Orange] => 
     log.info("Received event {}", event.body) 
    case _ => throw new IllegalArgumentException("Invalid entity") 
    } 
} 

class Apple(id: String) 
class Orange(id: String) 

ответ

0

Думаю, мне удалось в конце концов понять это.

Вопрос не имеет отношения к SEDA. Вместо этого кажется, что тот же DefaultProducerTemplate возвращается для нескольких экземпляров MyProducer.

Таким образом, есть иногда состояние гонки при установке defaultEndpointUri

решение, для меня, было создать только один экземпляр MyProducer актера, чтобы гарантировать, что мы не сталкиваемся с этим состояние гонки

0

I рекомендовал бы расширить черту Producer вместо шаблона для вашего MyProducer так же, как вы используете Consumer для своих MyAppleConsumer и MyOrangeConsumer.

class MyProducer(route: String) extends Producer with OneWay { 
    def endpointUri = route 
} 

Более подробную информацию можно найти здесь: http://doc.akka.io/docs/akka/snapshot/scala/camel.html

Я считаю, вы должны быть в состоянии упростить код, как это (отказ от ответственности: не компилируется или испытано!):

case class Apple(id: String) 
case class Orange(id: String) 

object TestApp extends App { 
    implicit val system = ActorSystem() 

    val appleProducer = system.actorOf(Props(classOf[MyProducer], "seda:appleRoute"), "AppleProducer") 
    system.actorOf(Props(classOf[MyConsumer], "seda:appleRoute"), "AppleConsumer") 
    val orangeProducer = system.actorOf(Props(classOf[MyProducer], "seda:orangeRoute"), "OrangeProducer") 
    system.actorOf(Props(classOf[MyConsumer], "seda:orangeRoute"), "OrangeConsumer") 

    appleProducer ! Apple("1") 
    orangeProducer ! Orange("1") 
    appleProducer ! Apple("2") 
    orangeProducer ! Orange("2") 
    appleProducer ! Apple("3") 
    orangeProducer ! Orange("3") 
    appleProducer ! Apple("4") 
    orangeProducer ! Orange("4") 
    appleProducer ! Apple("5") 
    orangeProducer ! Orange("5") 
    appleProducer ! Apple("6") 
    orangeProducer ! Orange("6") 

} 

class MyProducer(route: String) extends Producer with OneWay with ActorLogging { 
    def endpointUri = route 
} 

class MyConsumer(route: String) extends Consumer with ActorLogging { 
    override def endpointUri: String = route 

    override def receive: Receive = { 
    case CamelMessage(body : Apple, headers) => 
     log.info("Received event {}", body) 
    case CamelMessage(body : Orange, headers) => 
     log.info("Received event {}", body) 
    case _ => throw new IllegalArgumentException("Invalid entity") 
    } 
} 
+0

Мне нужно добавьте настраиваемую логику создания сообщений в моем Продюсере (что я оставил в примере), поэтому я использовал шаблон. – DJ180