2013-06-01 10 views
8

Я заранее извиняюсь за длину этой заметки. Я потратил немало времени на то, чтобы сделать его короче, и это было так мало, как я мог его получить.rxjava и clojure asynchrony mystery: фьючерсные обещания и агенты, о мой

У меня есть тайна и я буду благодарна за помощь. Эта тайна исходит из поведения rxjava observer Я написал в Clojure за пару простых observable s, вырезанных из онлайн-образцов.

Один наблюдаемый синхронно отправляет сообщения обработчикам onNext своих наблюдателей, и мой якобы принципиальный наблюдатель ведет себя так, как ожидалось.

Другие наблюдаемые асинхронно делают то же самое, в другом потоке, через Clojure future. Тот же самый наблюдатель не фиксирует все события, отправленные в его onNext; он просто потерял случайное количество сообщений в хвосте.

Существует намеренная гонка в следующем между истечением ожидания для promise г onCompleted и истечением ожидания для всех событий, передаваемых на agent коллектора. Если победит promise, я ожидаю увидеть false для onCompleted и возможно короткую очередь в agent. Если победит agent, я ожидаю увидеть true за onCompleted и все сообщения из очереди agent. Единственный результат, который я НЕ ожидаю, - true для onCompleted И короткая очередь от agent. Но Мерфи не спит, и это именно то, что я вижу. Я не знаю, виновата ли сборка мусора или какая-то внутренняя очередь на STM Clojure, или моя глупость, или что-то еще совсем.

Я представляю источник в порядке его автономной формы здесь, так что его можно запустить непосредственно через lein repl. Есть три cermonials, чтобы выйти из пути: во-первых, файл Leiningen проекта, project.clj, который декларирует зависимость от версии 0.9.0 от rxjava Нетфликса:

(defproject expt2 "0.1.0-SNAPSHOT" 
    :description "FIXME: write description" 
    :url "http://example.com/FIXME" 
    :license {:name "Eclipse Public License" 
      :url "http://www.eclipse.org/legal/epl-v10.html"} 
    :dependencies [[org.clojure/clojure    "1.5.1"] 
       [com.netflix.rxjava/rxjava-clojure "0.9.0"]] 
    :main expt2.core) 

Теперь, пространства имен и требование Clojure и импорт Java :

(ns expt2.core 
    (:require clojure.pprint) 
    (:refer-clojure :exclude [distinct]) 
    (:import [rx Observable subscriptions.Subscriptions])) 

Наконец, макрос для вывода на консоль:

(defmacro pdump [x] 
    `(let [x# ~x] 
    (do (println "----------------") 
     (clojure.pprint/pprint '~x) 
     (println "~~>") 
     (clojure.pprint/pprint x#) 
     (println "----------------") 
     x#))) 

Наконец, к моему наблюдателю. Я использую agent для сбора сообщений, отправленных любым наблюдаемым onNext. Я использую atom для сбора потенциального потенциала onError. Я использую promise для onCompleted, чтобы потребители, внешние по отношению к наблюдателю, могли ждать на нем.

(defn- subscribe-collectors [obl] 
    (let [;; Keep a sequence of all values sent: 
     onNextCollector  (agent []) 
     ;; Only need one value if the observable errors out: 
     onErrorCollector  (atom nil) 
     ;; Use a promise for 'completed' so we can wait for it on 
     ;; another thread: 
     onCompletedCollector (promise)] 
    (letfn [;; When observable sends a value, relay it to our agent" 
      (collect-next  [item] (send onNextCollector (fn [state] (conj state item)))) 
      ;; If observable errors out, just set our exception; 
      (collect-error  [excp] (reset! onErrorCollector  excp)) 
      ;; When observable completes, deliver on the promise: 
      (collect-completed [ ] (deliver onCompletedCollector true)) 
      ;; In all cases, report out the back end with this: 
      (report-collectors [ ] 
       (pdump 
       ;; Wait for everything that has been sent to the agent 
       ;; to drain (presumably internal message queues): 
       {:onNext  (do (await-for 1000 onNextCollector) 
           ;; Then produce the results: 
           @onNextCollector) 
       ;; If we ever saw an error, here it is: 
       :onError  @onErrorCollector 
       ;; Wait at most 1 second for the promise to complete; 
       ;; if it does not complete, then produce 'false'. 
       ;; I expect if this times out before the agent 
       ;; times out to see an 'onCompleted' of 'false'. 
       :onCompleted (deref onCompletedCollector 1000 false) 
       }))] 
     ;; Recognize that the observable 'obl' may run on another thread: 
     (-> obl 
      (.subscribe collect-next collect-error collect-completed)) 
     ;; Therefore, produce results that wait, with timeouts, on both 
     ;; the completion event and on the draining of the (presumed) 
     ;; message queue to the agent. 
     (report-collectors)))) 

Теперь это синхронное наблюдение. Он накачивает 25 сообщений вниз по горлам своих наблюдателей, затем называет их onCompleted.

(defn- customObservableBlocking [] 
    (Observable/create 
    (fn [observer]      ; This is the 'subscribe' method. 
     ;; Send 25 strings to the observer's onNext: 
     (doseq [x (range 25)] 
     (-> observer (.onNext (str "SynchedValue_" x)))) 
     ; After sending all values, complete the sequence: 
     (-> observer .onCompleted) 
     ; return a NoOpSubsription since this blocks and thus 
     ; can't be unsubscribed (disposed): 
     (Subscriptions/empty)))) 

Мы поддерживаем наш наблюдатель в этой наблюдаемой:

;;; The value of the following is the list of all 25 events: 
(-> (customObservableBlocking) 
    (subscribe-collectors)) 

Он работает, как ожидалось, и мы видим следующие результаты на консоли

{:onNext (do (await-for 1000 onNextCollector) @onNextCollector), 
:onError @onErrorCollector, 
:onCompleted (deref onCompletedCollector 1000 false)} 
~~> 
{:onNext 
["SynchedValue_0" 
    "SynchedValue_1" 
    "SynchedValue_2" 
    "SynchedValue_3" 
    "SynchedValue_4" 
    "SynchedValue_5" 
    "SynchedValue_6" 
    "SynchedValue_7" 
    "SynchedValue_8" 
    "SynchedValue_9" 
    "SynchedValue_10" 
    "SynchedValue_11" 
    "SynchedValue_12" 
    "SynchedValue_13" 
    "SynchedValue_14" 
    "SynchedValue_15" 
    "SynchedValue_16" 
    "SynchedValue_17" 
    "SynchedValue_18" 
    "SynchedValue_19" 
    "SynchedValue_20" 
    "SynchedValue_21" 
    "SynchedValue_22" 
    "SynchedValue_23" 
    "SynchedValue_24"], 
:onError nil, 
:onCompleted true} 
---------------- 

Вот асинхронная наблюдаемым, что делает точно так же, только на резьбе future:

(defn- customObservableNonBlocking [] 
    (Observable/create 
    (fn [observer]      ; This is the 'subscribe' method 
     (let [f (future 
       ;; On another thread, send 25 strings: 
       (doseq [x (range 25)] 
        (-> observer (.onNext (str "AsynchValue_" x)))) 
       ; After sending all values, complete the sequence: 
       (-> observer .onCompleted))] 
     ; Return a disposable (unsubscribe) that cancels the future: 
     (Subscriptions/create #(future-cancel f)))))) 

;;; For unknown reasons, the following does not produce all 25 events: 
(-> (customObservableNonBlocking) 
    (subscribe-collectors)) 

Но, удивление, вот что мы видим на консоли: true для onCompleted, подразумевая, что promise DID NOT TIME-OUT; но только некоторые из асинхронных сообщений. Фактическое количество сообщений, которые мы видим, варьируется от run to run, подразумевая, что в игре наблюдается некоторое явление параллелизма. Ключи оценены.

---------------- 
{:onNext (do (await-for 1000 onNextCollector) @onNextCollector), 
:onError @onErrorCollector, 
:onCompleted (deref onCompletedCollector 1000 false)} 
~~> 
{:onNext 
["AsynchValue_0" 
    "AsynchValue_1" 
    "AsynchValue_2" 
    "AsynchValue_3" 
    "AsynchValue_4" 
    "AsynchValue_5" 
    "AsynchValue_6"], 
:onError nil, 
:onCompleted true} 
---------------- 

ответ

7

await-for на агенте означает блокирует текущий поток до тех пор, все действия не отправили, таким образом, далеко (из этого потока или агента) для агентов произошли, что означает, что это может случиться так, что после AWAIT есть еще какой-то другой поток, который может отправлять сообщения агенту, и это то, что происходит в вашем случае. После того, как ваш ожидание на агенте закончится, и вы измените его значение в ключе :onNext на карте, то вы ждете обещанного обещания, которое после ожидания остается верным, но в то же время некоторые другие сообщения были отправлены на агента, собираемого в вектор.

Вы можете решить эту проблему, указав ключ :onCompleted в качестве первого ключа на карте, который в основном означает ожидание завершения, а затем ждать агентов coz к тому времени больше нет send звонки на агента могут произойти после того, как уже получили onCompleted.

{:onCompleted (deref onCompletedCollector 1000 false) 
:onNext  (do (await-for 0 onNextCollector) 
           @onNextCollector) 
:onError  @onErrorCollector 
} 
+0

проверено и проверено. –