2013-09-29 14 views
9

Этот код безопасен?RxJava thread-safety

Observable<String> observable = ... // some observable that calls 
            // onNext from a background thread 

observable 
    .scan(new ArrayList<String>(), (List<String> acc, String next) -> { 
    acc.add(next); 
    return acc; 
    }) 
    .subscribe(list -> { 
    // do somethind with sequence of lists 
    ... 
    }); 

Мне любопытно, что ArrayList не является потокобезопасной структурой данных.

+0

Рекомендации по дизайну Rx полезны: http://go.microsoft.com/fwlink/?LinkID=205219. –

+0

Возможный дубликат [Is SerializedSubject, необходимый для обеспечения безопасности потоков в RxJava] (http://stackoverflow.com/questions/31841809/is-serializedsubject-necessary-for-thread-safety-in-rxjava) –

ответ

6

Как быстрый ответ, в .NET (исходная реализация Rx) все значения из наблюдаемой последовательности можно считать последовательными. Это не исключает многопоточности. Однако, если вы создаете значения в многопоточном режиме, вы можете потребовать последовательный характер, ища эквивалентную функцию для оператора Rx .NET Synchronize().

Другой вариант - проверить реализацию Scan в исходном коде RxJava, чтобы подтвердить, что он обеспечивает последовательный характер, который вы хотите/ожидаете, чтобы обеспечить безопасность в вашей функции аккумулятора.

+0

'scan' в RxJava делает то же самое, что и исходная версия .Net. – allprog

+0

Cheers. Я в основном говорю, что со временем я не могу быть уверенным, что базы .NET и Java будут одинаковыми. –

4

Если этот код не является потокобезопасным, то либо RxJava сломан, либо поврежден источник Observable - операторы, не являющиеся второстепенными, являются частью контракта Rx.

+0

И это может быть принудительно применено к оператору наблюдения Rx 'Checked()', если RxJava фактически реализовал его. Потяните запрос, кто-нибудь? –

+0

Защита резьбы для блоков не так проста, на мой взгляд. В коде, показанном в сообщении, не содержится много деталей. Я предполагаю, что @ORionll не заметил, что подписка доставляется синхронно по умолчанию. Но если он был преобразован в асинхронную подписку, то доступ к списку массивов не будет более безопасным. – allprog

+1

@allprog Вы правы, что это не так просто, но это одно из преимуществ Rx, поскольку он делает работу за кулисами, чтобы сделать это безопасным. –

 Смежные вопросы

  • Нет связанных вопросов^_^