2016-10-19 8 views
0

Я хотел бы комбинировать последние с потоками Akka, как описано here.Akka Streams - объединить последние операции

Я не могу понять, как это сделать - пожалуйста, помогите!

Thanks, Ryan.

+0

Не могли бы вы дать некоторые дополнительные особенности? Каков вклад, что является потребителем. Здесь у меня есть тест interop, который показывает, как RxJava 2 может работать с Akka-Stream: https://github.com/akarnokd/akarnokd-misc/blob/master/src/jmh/java/hu/akarnokd/comparison/AkkaStreamsCrossMapPerf. java # L54 – akarnokd

+0

Какая специфика? Я хотел бы иметь возможность использовать два потока точно так же, как функция 'zip', но вместо семантики zip я получаю после семантики последней комбинации, которую я связывал. Проверяли связь, и я не уверен, как она мне помогает? Может быть, мы говорим в перекрестных целях? –

+0

Я думал, что вы хотите объединить два потока Akka, но ему не хватает оператора, поэтому вы захотите повторно использовать оператор combLatest RxJava. – akarnokd

ответ

1

Я только что реализовал его быстро. Не уверен, что его не грозит, но стоит попробовать :) https://gist.github.com/tg44/2e75d45c234ca02d91cfdac35f41a5a2 Комментарии под сенсором приветствуются!

Как мы говорили на канале gitter, его невозможно достичь с помощью поэтапно, но вы можете написать функциональность с помощью настраиваемого этапа. Вам понадобятся два входа и один выход (может быть увеличен до N входа), поэтому он имеет форму вентилятора.

Я сохраняю входящие элементы в параметрах и всякий раз, когда вход готов (иначе отправляйте элемент) Я сохраняю данный элемент в опции. Всякий раз, когда на выходе нужен элемент (и у нас уже есть один элемент с обоих входов), я даю ему значения из опций в виде набора. Это подход с противодавлением.

Для обратного подхода (где вы производите все пары) вам нужно обрабатывать ожидающий «другой» выходной элемент, а затем последний, и ему нужно обрабатывать входные выходы. Я думаю, что моя реализация по-прежнему не справляется с слишком быстрыми производителями с медленным потребительским случаем (мы можем пропустить один элемент, умеем обрабатывать с испусканием), и может затормозить, если оба входа производят один и тот же элемент несколько раз (возможно, испускания могут справиться и с этим).

Если вы хотите расширить свою функциональность кода или хотите писать другие пользовательские этапы прочитать: http://doc.akka.io/docs/akka/2.5/scala/stream/stream-customize.html

+0

Отредактировано конструктивными мыслями, возможными ошибками и упомянутой документацией и контекстом «до». (Я не думаю, что копировать весь код - хорошая идея ...) – tg44