2015-07-23 3 views
1

On: akka-stream-experimental_2.11 1.0.

Мы используем Framing.delimiter на сервере Tcp. Когда сообщение прибывает с длиной, большей, чем maximumFrameLength, генерируется исключение FramingException, и мы можем захватить его из OnError для ActorSubscriber.

сервер Код:

def bind(address: String, port: Int, target: ActorRef, maxInFlight: Int, maxFrameLength: Int) 
    (implicit system: ActorSystem, actorMaterializer: ActorMaterializer): Future[ServerBinding] = { 
    val sink = Sink.foreach { 
     conn: Tcp.IncomingConnection => 
     val targetSubscriber = ActorSubscriber[Message](system.actorOf(Props(new TargetSubscriber(target, maxInFlight)))) 

     val targetSink = Flow[ByteString] 
      .via(Framing.delimiter(ByteString("\n"), maximumFrameLength = maxFrameLength, allowTruncation = true)) 
      .map(raw ⇒ Message(raw)) 
      .to(Sink(targetSubscriber)) 

     conn.flow.to(targetSink).runWith(Source(Promise().future)) 
    } 
    val connections = Tcp().bind(address, port) 
    connections.to(sink).run() 
    } 

код подписчик:

class TargetSubscriber(target: ActorRef, maxInFlight: Int) extends ActorSubscriber with ActorLogging { 
    private var inFlight = 0 

    override protected def requestStrategy = new MaxInFlightRequestStrategy(maxInFlight) { 
    override def inFlightInternally = inFlight 
    } 

    override def receive = { 
    case OnNext(msg: Message) ⇒ 
     target ! msg 
     inFlight += 1 
    case OnError(t) ⇒ 
     inFlight -= 1 
     log.error(t, "Subscriber encountered error") 
    case TargetAck(_) ⇒ 
     inFlight -= 1 
    } 
} 

Проблема: сообщения, которые находятся под максимальной длиной кадра не текут после этого исключения для этого входящего соединения. убивая клиента, и он работает нормально.

ActorSubscriber не чтит supervision

Что такое правильный способ, чтобы пропустить плохое сообщение и продолжить со следующей хорошим сообщением?

ответ

0

Вы пытались поставить надзор на мойку targetFlow вместо всего материализатора? Я не вижу его здесь нигде, и я считаю, что он должен быть установлен непосредственно на этом потоке.

Stil это скорее предположение, чем науки;)