2016-11-13 8 views
0

У меня есть org.reactivestreams.Processor, который я бы хотел использовать с RxJava 2.0. Однако, хотя есть преобразования для интеграции org.reactivestreams.Publisher с RxJava, например io.reactivex.Flowable#fromPublisher, мне не ясно, как лучше всего интегрировать org.reactivestreams.Processor (или org.reactivestreams.Subscriber). Может ли кто-нибудь осветить этот свет?Использование процессора реактивных потоков с RxJava 2.0

ответ

0

Вы обернуть Publisher стороны и держать Subscriber сторону как:

Processor proc = ... 

Subscriber sub = proc; 
Flowable flow = Flowable.fromPublisher(proc); 

flow.map(v -> v.toString()).subscribe(System.out::println); 

sub.onNext(1); 
+0

Хм, но тогда я мог бы нарушить реактивный контракт потоков, что указует, что 'onNext' не должен вызываться чаще, чем просили через 'подписка # запрос (длинный)'. –

+0

Это зависит от того, где вы получили этот процессор, или если он координирует нисходящие запросы или нет. Процессоры RxJava не координируют, и они всегда будут запрашивать Long.MAX_VALUE, если вы отправляете им подписку. – akarnokd

+0

Процессор подтверждает [spec] (https://github.com/reactive-streams/reactive-streams-jvm/blob/v1.0.0/README.md#specification) и успешно интегрирован с потоками akka. Я думал, что в отличие от RxJava 1.x, RxJava 2.0 поддерживает противодавление ... –