2016-08-20 10 views
7

У меня есть 100 сотрудников (агентов), которые делят один ref, содержащий коллекцию заданий. Хотя эта коллекции есть задача, каждый работник получить одну задачу из этой коллекции (в dosync блоке), распечатать его, а иногда и положить его обратно в сборе (в dosync блоке):Странное поведение clojure ref

(defn have-tasks? 
    [tasks] 
    (not (empty? @tasks))) 

(defn get-task 
    [tasks] 
    (dosync 
    (let [task (first @tasks)] 
     (alter tasks rest) 
     task))) 

(defn put-task 
    [tasks task] 
    (dosync (alter tasks conj task)) 
    nil) 

(defn worker 
    [& {:keys [tasks]}] 
    (agent {:tasks tasks})) 

(defn worker-loop 
    [{:keys [tasks] :as state}] 
    (while (have-tasks? tasks) 
    (let [task (get-task tasks)] 
     (println "Task: " task) 
     (when (< (rand) 0.1) 
     (put-task tasks task)))) 
    state) 

(defn create-workers 
    [count & options] 
    (->> (range 0 count) 
     (map (fn [_] (apply worker options))) 
     (into []))) 

(defn start-workers 
    [workers] 
    (doseq [worker workers] (send-off worker worker-loop))) 

(def tasks (ref (range 1 10000000))) 

(def workers (create-workers 100 :tasks tasks)) 

(start-workers workers) 
(apply await workers) 

Когда я запускаю этот код, то последнее значение напечатано агентов (после нескольких попыток): 435445, 4556294, 1322061, 3950017. Но никогда 9999999 что я ожидаю. И каждый раз, когда коллекция действительно пуста в конце. Что я делаю неправильно?

Edit:

Я переписал уборщица петлю как можно более простым:

(defn worker-loop 
    [{:keys [tasks] :as state}] 
    (loop [] 
    (when-let [task (get-task tasks)] 
     (println "Task: " task) 
     (recur))) 
    state) 

Но проблема все еще существует. Этот код ведет себя так, как ожидалось, при создании одного и только одного рабочего.

+0

Является ли 'println' нить безопасной? –

+0

@ShannonSeverance No. Необходимо использовать _e.g._ like '(locking: out (println" ... "))', чтобы дать разборчивый вывод. –

ответ

4

Проблема здесь не имеет ничего общего с агентами и едва ли что-то связано с лени. Вот несколько сокращенный вариант исходного кода, который до сих пор проявляет проблему:

(defn f [init] 
    (let [state (ref init) 
     task (fn [] 
       (loop [last-n nil] 
       (if-let [n (dosync 
           (let [n (first @state)] 
           (alter state rest) 
           n))] 
        (recur n) 
        (locking :out 
        (println "Last seen:" last-n))))) 
     workers (->> (range 0 5) 
        (mapv (fn [_] (Thread. task))))] 
    (doseq [w workers] (.start w)) 
    (doseq [w workers] (.join w)))) 

(defn r [] 
    (f (range 1 100000))) 

(defn i [] (f (->> (iterate inc 1) 
        (take 100000)))) 

(defn t [] 
    (f (->> (range 1 100000) 
      (take Integer/MAX_VALUE)))) 

Выполнение этого кода показывает, что оба i и t, как ленивые, надежно работать, в то время как r надежно не делает. Проблема на самом деле является ошибкой параллелизма в классе, возвращаемом вызовом range. Действительно, эта ошибка зарегистрирована в this Clojure ticket и исправлена ​​с версии Clojure 1.9.0-alpha11.

Быстрый резюме ошибки в случае, если билет не доступен по какой-то причине: в внутренностями rest вызова на результат range, была небольшая возможность для условия гонки: «flag», что говорит «следующее значение уже вычислено» было set before the actual value itself, что означало, что второй поток мог видеть, что этот флаг является истинным, даже если «следующее значение» по-прежнему nil. Затем звонок в alter исправит это значение nil на ref. Он исправлен swapping the two assignment lines.

В случаях, когда результат range был либо принудительно реализован в одном потоке, либо завернут в другой ленивый сегмент, эта ошибка не появится.

1

Когда достигнуто последнее число в диапазоне, все еще более старые номера удерживаются рабочими. Некоторые из них будут возвращены в очередь, чтобы их снова обрабатывали.

Для того, чтобы лучше видеть, что происходит, вы можете изменить worker-loop для печати последнего задания обрабатывается каждым работник:

(defn worker-loop 
    [{:keys [tasks] :as state}] 
    (loop [last-task nil] 
    (if (have-tasks? tasks) 
     (let [task (get-task tasks)] 
     ;; (when (< (rand) 0.1) 
     ;; (put-task tasks task) 
     (recur task)) 
     (when last-task 
     (println "Last task:" last-task)))) 
    state) 

Это также показывает состояние гонки в коде, где задачи видели на have-tasks? часто принимается другими, когда get-task вызывается ближе к концу обработки задач.

Состояние гонки можно решить, удалив have-tasks? и вместо этого используя возвращаемое значение nil от get-task в качестве сигнала, что больше нет доступных задач (на данный момент).

Обновлено:

Как отмечалось, эта раса условия не объясняет эту проблему.

Ни проблема решается удалением возможного состояния гонки в get-task так:

(defn get-task [tasks] 
    (dosync 
    (first (alter tasks rest)))) 

Однако изменения get-task использовать явную блокировку, кажется, чтобы решить эту проблему:

(defn get-task [tasks] 
    (locking :lock 
    (dosync 
     (let [task (first @tasks)] 
     (alter tasks rest) 
     task)))) 
+2

Я не думаю, что это причина. Я могу прокомментировать выражение '(when (<(rand) ..." и не возвращать какие-либо задачи обратно в очередь, и он по-прежнему обрабатывает только часть. Кроме того, он возвращает только 10% всех задач в среднем и последние номера задач для печати перед остановкой иногда не являются даже половиной всей очереди, поэтому теория на самом деле не имеет смысла. Я посмотрел на это сегодня и надеюсь, что смогу найти ответ или что кто-то может это сделать. Вопрос: – Josh

+0

Да, вы правы насчет состояния гонки в моем коде, спасибо. Я переписал мой код как можно более простым, но проблема все еще там. –

3

Я спросил это question на Clojure Google Group, и это помогло мне найти ответ.

Проблема в том, что я использовал ленивую последовательность в транзакции STM.

Когда я заменил этот код:

(def tasks (ref (range 1 10000000))) 

этим:

(def tasks (ref (into [] (range 1 10000000)))) 

он работал, как и ожидалось!

В моем производственном коде, где возникла проблема, я использовал структуру Korma, которая также возвращает ленивую коллекцию кортежей, как в моем примере.

Заключение: избегайте использования ленивых структур данных в рамках транзакции STM.

+0

Поскольку вы используете только один реф для своего состояния: пытались ли вы использовать а вместо ref? Это похоже на тот же результат и большое (_ca_ 10 x) сокращение времени выполнения. –

+0

Да, я думал об этом, но я не знаю, как написать скоординированную атомную функцию «get-task» «в этом случае. Какую функцию я должен передать функции swap!» –

+1

Либо '(defn get-taks [tasks] (пусть [my-tasks @tasks] (если (compare-and-set! задачи my-tasks (rest my-tasks)) (первые мои задачи) (recur tasks)))) 'или запускать задачи с манекеном 0 и делать' (defn get-tasks [tasks] (сначала (swap! tasks) rest))) ' –