2017-02-22 28 views
1

У меня есть akka http streams для реализации сервера WebSocket. Тем не менее, клиент отправляет много данных каждую минуту. Таким образом, соединение поддерживается между клиентом и сервером.Akka http streams с использованием раковины памяти растет

При внедрении обработчика сетевых сокетов я использовал поток и раковину. И используя Sink.seq. Раковина - это то, что продолжает собирать входящие элементы, пока восходящий поток не прекратится.

Как этого избежать?

implicit val system = ActorSystem("DecoderSystem") 
    implicit val materializer = ActorMaterializer() 
    val streamedMsgFuture: Future[Seq[ByteString]] = streamedMsg.runWith(Sink.seq) 
    streamedMsgFuture.onComplete { completedStream => 
     var completeBytestring = new ByteStringBuilder() 
     //I'm sure there's a better way to do this.. but hey, it's O(n) 
     completedStream.foreach { x => 
     x.foreach { y => 
      completeBytestring ++= y 
     } 
     } 

ответ

1

Как решить эту проблему полностью зависит от того, как вы хотите использовать входящие данные. Это не ясно из вопроса, поэтому я собираюсь предположить, что вы хотите выполнить какое-то действие для каждого входящего ByteString. Вы всегда можете усложнить эту логику, введя некоторый этап дозирования - потоки akka могут помочь вам в этом.

Вы можете выполнить действие на любом отдельном входящем ByteString с помощью комбинаторов map/mapAsync (в зависимости от того, является ли ваше действие синхронным/асинхронным). Пример ниже:

def action(msg: ByteString): SomeResult = ??? // do something 
def asyncAction(msg: ByteString): Future[SomeResult] = ??? // do something 

def handleResult(result: SomeResult): Unit = ??? // do something 

streamedMsg.map(action).runForeach(handleResult) 
streamedMsg.mapAsync(5)(asyncAction).runForeach(handleResult)