У меня есть org.reactivestreams.Processor
, который я бы хотел использовать с RxJava 2.0. Однако, хотя есть преобразования для интеграции org.reactivestreams.Publisher
с RxJava, например io.reactivex.Flowable#fromPublisher
, мне не ясно, как лучше всего интегрировать org.reactivestreams.Processor
(или org.reactivestreams.Subscriber
). Может ли кто-нибудь осветить этот свет?Использование процессора реактивных потоков с RxJava 2.0
0
A
ответ
0
Вы обернуть Publisher
стороны и держать Subscriber
сторону как:
Processor proc = ...
Subscriber sub = proc;
Flowable flow = Flowable.fromPublisher(proc);
flow.map(v -> v.toString()).subscribe(System.out::println);
sub.onNext(1);
Хм, но тогда я мог бы нарушить реактивный контракт потоков, что указует, что 'onNext' не должен вызываться чаще, чем просили через 'подписка # запрос (длинный)'. –
Это зависит от того, где вы получили этот процессор, или если он координирует нисходящие запросы или нет. Процессоры RxJava не координируют, и они всегда будут запрашивать Long.MAX_VALUE, если вы отправляете им подписку. – akarnokd
Процессор подтверждает [spec] (https://github.com/reactive-streams/reactive-streams-jvm/blob/v1.0.0/README.md#specification) и успешно интегрирован с потоками akka. Я думал, что в отличие от RxJava 1.x, RxJava 2.0 поддерживает противодавление ... –