Я заранее извиняюсь за длину этой заметки. Я потратил немало времени на то, чтобы сделать его короче, и это было так мало, как я мог его получить.rxjava и clojure asynchrony mystery: фьючерсные обещания и агенты, о мой
У меня есть тайна и я буду благодарна за помощь. Эта тайна исходит из поведения rxjava observer
Я написал в Clojure за пару простых observable
s, вырезанных из онлайн-образцов.
Один наблюдаемый синхронно отправляет сообщения обработчикам onNext
своих наблюдателей, и мой якобы принципиальный наблюдатель ведет себя так, как ожидалось.
Другие наблюдаемые асинхронно делают то же самое, в другом потоке, через Clojure future
. Тот же самый наблюдатель не фиксирует все события, отправленные в его onNext
; он просто потерял случайное количество сообщений в хвосте.
Существует намеренная гонка в следующем между истечением ожидания для promise
г onCompleted
и истечением ожидания для всех событий, передаваемых на agent
коллектора. Если победит promise
, я ожидаю увидеть false
для onCompleted
и возможно короткую очередь в agent
. Если победит agent
, я ожидаю увидеть true
за onCompleted
и все сообщения из очереди agent
. Единственный результат, который я НЕ ожидаю, - true
для onCompleted
И короткая очередь от agent
. Но Мерфи не спит, и это именно то, что я вижу. Я не знаю, виновата ли сборка мусора или какая-то внутренняя очередь на STM Clojure, или моя глупость, или что-то еще совсем.
Я представляю источник в порядке его автономной формы здесь, так что его можно запустить непосредственно через lein repl
. Есть три cermonials, чтобы выйти из пути: во-первых, файл Leiningen проекта, project.clj
, который декларирует зависимость от версии 0.9.0
от rxjava Нетфликса:
(defproject expt2 "0.1.0-SNAPSHOT"
:description "FIXME: write description"
:url "http://example.com/FIXME"
:license {:name "Eclipse Public License"
:url "http://www.eclipse.org/legal/epl-v10.html"}
:dependencies [[org.clojure/clojure "1.5.1"]
[com.netflix.rxjava/rxjava-clojure "0.9.0"]]
:main expt2.core)
Теперь, пространства имен и требование Clojure и импорт Java :
(ns expt2.core
(:require clojure.pprint)
(:refer-clojure :exclude [distinct])
(:import [rx Observable subscriptions.Subscriptions]))
Наконец, макрос для вывода на консоль:
(defmacro pdump [x]
`(let [x# ~x]
(do (println "----------------")
(clojure.pprint/pprint '~x)
(println "~~>")
(clojure.pprint/pprint x#)
(println "----------------")
x#)))
Наконец, к моему наблюдателю. Я использую agent
для сбора сообщений, отправленных любым наблюдаемым onNext
. Я использую atom
для сбора потенциального потенциала onError
. Я использую promise
для onCompleted
, чтобы потребители, внешние по отношению к наблюдателю, могли ждать на нем.
(defn- subscribe-collectors [obl]
(let [;; Keep a sequence of all values sent:
onNextCollector (agent [])
;; Only need one value if the observable errors out:
onErrorCollector (atom nil)
;; Use a promise for 'completed' so we can wait for it on
;; another thread:
onCompletedCollector (promise)]
(letfn [;; When observable sends a value, relay it to our agent"
(collect-next [item] (send onNextCollector (fn [state] (conj state item))))
;; If observable errors out, just set our exception;
(collect-error [excp] (reset! onErrorCollector excp))
;; When observable completes, deliver on the promise:
(collect-completed [ ] (deliver onCompletedCollector true))
;; In all cases, report out the back end with this:
(report-collectors [ ]
(pdump
;; Wait for everything that has been sent to the agent
;; to drain (presumably internal message queues):
{:onNext (do (await-for 1000 onNextCollector)
;; Then produce the results:
@onNextCollector)
;; If we ever saw an error, here it is:
:onError @onErrorCollector
;; Wait at most 1 second for the promise to complete;
;; if it does not complete, then produce 'false'.
;; I expect if this times out before the agent
;; times out to see an 'onCompleted' of 'false'.
:onCompleted (deref onCompletedCollector 1000 false)
}))]
;; Recognize that the observable 'obl' may run on another thread:
(-> obl
(.subscribe collect-next collect-error collect-completed))
;; Therefore, produce results that wait, with timeouts, on both
;; the completion event and on the draining of the (presumed)
;; message queue to the agent.
(report-collectors))))
Теперь это синхронное наблюдение. Он накачивает 25 сообщений вниз по горлам своих наблюдателей, затем называет их onCompleted
.
(defn- customObservableBlocking []
(Observable/create
(fn [observer] ; This is the 'subscribe' method.
;; Send 25 strings to the observer's onNext:
(doseq [x (range 25)]
(-> observer (.onNext (str "SynchedValue_" x))))
; After sending all values, complete the sequence:
(-> observer .onCompleted)
; return a NoOpSubsription since this blocks and thus
; can't be unsubscribed (disposed):
(Subscriptions/empty))))
Мы поддерживаем наш наблюдатель в этой наблюдаемой:
;;; The value of the following is the list of all 25 events:
(-> (customObservableBlocking)
(subscribe-collectors))
Он работает, как ожидалось, и мы видим следующие результаты на консоли
{:onNext (do (await-for 1000 onNextCollector) @onNextCollector),
:onError @onErrorCollector,
:onCompleted (deref onCompletedCollector 1000 false)}
~~>
{:onNext
["SynchedValue_0"
"SynchedValue_1"
"SynchedValue_2"
"SynchedValue_3"
"SynchedValue_4"
"SynchedValue_5"
"SynchedValue_6"
"SynchedValue_7"
"SynchedValue_8"
"SynchedValue_9"
"SynchedValue_10"
"SynchedValue_11"
"SynchedValue_12"
"SynchedValue_13"
"SynchedValue_14"
"SynchedValue_15"
"SynchedValue_16"
"SynchedValue_17"
"SynchedValue_18"
"SynchedValue_19"
"SynchedValue_20"
"SynchedValue_21"
"SynchedValue_22"
"SynchedValue_23"
"SynchedValue_24"],
:onError nil,
:onCompleted true}
----------------
Вот асинхронная наблюдаемым, что делает точно так же, только на резьбе future
:
(defn- customObservableNonBlocking []
(Observable/create
(fn [observer] ; This is the 'subscribe' method
(let [f (future
;; On another thread, send 25 strings:
(doseq [x (range 25)]
(-> observer (.onNext (str "AsynchValue_" x))))
; After sending all values, complete the sequence:
(-> observer .onCompleted))]
; Return a disposable (unsubscribe) that cancels the future:
(Subscriptions/create #(future-cancel f))))))
;;; For unknown reasons, the following does not produce all 25 events:
(-> (customObservableNonBlocking)
(subscribe-collectors))
Но, удивление, вот что мы видим на консоли: true
для onCompleted
, подразумевая, что promise
DID NOT TIME-OUT; но только некоторые из асинхронных сообщений. Фактическое количество сообщений, которые мы видим, варьируется от run to run, подразумевая, что в игре наблюдается некоторое явление параллелизма. Ключи оценены.
----------------
{:onNext (do (await-for 1000 onNextCollector) @onNextCollector),
:onError @onErrorCollector,
:onCompleted (deref onCompletedCollector 1000 false)}
~~>
{:onNext
["AsynchValue_0"
"AsynchValue_1"
"AsynchValue_2"
"AsynchValue_3"
"AsynchValue_4"
"AsynchValue_5"
"AsynchValue_6"],
:onError nil,
:onCompleted true}
----------------
проверено и проверено. –