В реакторе нет смысла отказываться от Subscription
, прежде чем вы позвоните subscribe()
(так как это очень метод, который создает Subscription
и распространяет этот сигнал на цепочку, чтобы начать эмиссию данных).
Нет централизованного места со всеми подписками, что не имеет особого смысла, потому что вам нужно найти конкретные подписки, которые вы хотите отменить (и имейте в виду, что каждый оператор в вашей цепочке может использовать промежуточная подписка ...).
Обратите внимание, что некоторые операторы также отменит подписку от вашего имени! То есть в случае take(int)
, например, который отменит вверх достаточно один раз детали были выброшены:
Flux.just(1, 2, 3, 4).log().take(2).subscribe(System.out::println);
Выведет:
14:17:48.729 [main] INFO reactor.Flux.Array.1 - | onSubscribe([Synchronous Fuseable] FluxArray.ArraySubscription)
14:17:48.732 [main] INFO reactor.Flux.Array.1 - | request(unbounded)
14:17:48.732 [main] INFO reactor.Flux.Array.1 - | onNext(1)
1
14:17:48.732 [main] INFO reactor.Flux.Array.1 - | onNext(2)
2
14:17:48.732 [main] INFO reactor.Flux.Array.1 - | cancel()
О, я не знал о том, что оператор: O Но для меня самым важным было то, что объект Cancellation отправлял сигнал OnCancel, который я мог бы правильно обработать. Но все-таки я нашел обходной путь к моей проблеме - я отменяю потоки, бросая исключения в Flux, так что это неплохо – Kapitalny
Я бы посоветовал вам предпочесть использовать объект «Отмена». Обратите внимание, что он станет «одноразовым» в 3.1 (в этот момент вам придется вызывать 'dispose()', а не 'cancel()'). Это или посмотрите на операторов, которые могли бы естественно соответствовать тому, что вы хотите сделать, и отменить для вас, если это необходимо ... Бросание исключений в поток не слишком хорошо подходит для решения, в зависимости от вашего варианта использования. –
@ SimonBaslé Я запускаю ваш код, и если 'take (2)' находится перед 'log()', тогда сигнал 'cancel()' не печатается. Зачем? Вы сказали, что оператор 'take' отменяет поток, а не источник. –