2017-01-13 5 views
0

нужна помощь. Глядя на потоки данных/конвейеры Gpars, но что-то я не понимаюgpars dataflowQueues обработка или конвейеры только срабатывает по запросу df.val

, если вы посмотрите на пример ниже (я сделал это с операторами, трубопроводами, цепью и попал в ту же проблему).

В этом примере я использовал задачи, но так же легко мог быть без и одинаковых проблемных проявлений. В этом примере я создаю два DataflowQueues, один для начальных условий и один для результатов оценки по предикату. Затем я компоную конвейер, который оценивает входы против входов против предиката (это даже тест) и сохраняет результаты в очереди выходных результатов

, установив конвейер и разместив некоторые записи в первую очередь, я полагал, что записи будут обработаны, поскольку данные были доступны (это также не работало для версии оператора), так как вы можете видеть, что я проверяю размер результатаQ своим нулем (если я удалю задачу, которая по-прежнему истинна) после того, как я напишу записи в sessionQ. Поэтому запись данных не «запускает» обработку.

первая задача сохранить количество записей в очереди.

import groovyx.gpars.dataflow.Dataflow 
import groovyx.gpars.dataflow.DataflowQueue 
import groovyx.gpars.dataflow.DataflowVariable 
import groovyx.gpars.dataflow.Promise 

/** 
* Created by will on 13/01/2017. 
*/ 

def iValues = [1,2,3,4,5] 

DataflowQueue sessionQ = new DataflowQueue() 
DataflowQueue resultQ = new DataflowQueue() 

Dataflow.task { 
    println "setup task: set initial conditions list for rule predicate " 
    iValues.each {sessionQ << it} 
} 

Closure evenPredicate = {it %2 == 0} 

//layout pipeline 
sessionQ | evenPredicate | resultQ 

assert resultQ.iterator().size() == 0 

Promise ans = Dataflow.task { 
    println "result task : get three values from result q " 
    def outlist = [] 
    3.times { 
     def res = resultQ.val 
     println "got result $res" 
     outlist << res 
    } 
    assert sessionQ.iterator().size() == 0 
    assert resultQ.iterator().size() == 2 
    outlist 
} 

println "ans list is $ans.val" 
assert resultQ.iterator().size() == 2 

его только во второй задаче/chainWith и т.д. - где вы вызываете .val (или получить()) на второй очереди, что двигатель начинает работать и все записи обрабатываются из первой очереди и результаты, связанные с результатом Q.

Вы можете видеть это из утверждений, как только первые триггерные (.val) вызовы синхронизации запускаются и обрабатывают ВСЕ связанные записи в начальном сеансеQ.

Это проблема, так как до тех пор, пока вы не запустите этот первый вызов .val, если вы выполните poll() или resultQ.interator.size(), например, он пуст и unbound, size() = 0. так что вы не можете написать

for (dfRes in resultQ) {//do something with dfRes} 

как всегда пусто, пока вы не будете использовать первый элемент из сеансаQ. Я не понимаю, почему? После того, как записи привязаны к первому методу dataflowQueue, я думал, что элементы будут потребляться, когда они станут доступны (привязаны), но они не являются.

Это сейчас сложно, поскольку вы не можете получить записи, проверить размер результатов, выполнить опрос(), на resultQ, поскольку он потерпит неудачу, пока не будет прочитан первый DF из sessionQ.

Мне пришлось использовать размер массива начальных значений (сообщает мне записи, сохраненные в очереди), поскольку ТОЛЬКО означает чтение того же числа с обратной стороны результатаQ для его удаления (в приведенном выше тексте I только потребляли 3 записи из результатовQ, и утверждение показывает, что в resultQ осталось 2 записи (но только после того, как был сделан первый вызов .val, если вы прокомментируете эту строку из всех утверждений, начинающихся с сбоя)

Я попытался это с Dataflow.operator, трубопровода и т.д., и получить такую ​​же проблему. поэтому не работа обрабатываются как каждый вход связан с SessionQ?

наконец, в случае трубопровода, Theres в .complete (), который, если вы обрабатываете замыкание {} в конвейере, остается открытым (! complete()), но когда вы запускаете метод типа .binaryChoice(), он помечает конвейер как завершенный, и никакие дальнейшие действия не могут быть добавлены. Почему это делается?

Я не понимаю, что говорит это состояние (не будет больше обработки), и исключение будет выбрано, если вы попытаетесь сделать еще один шаг после такого метода.

так или иначе - я пытался трубопроводную линию, как этот

Pipeline pipeLine = new Pipeline(Q) 
pipeLine.tap(log).binaryChoice(evenPathQ, oddPathQ) {println "$it %2 is ${it%2 ==0}"; (it%2 == 0) } 

однако, когда вы связываете значения Q ничего не происходит - пока вы не потребляете выход как

odd.val 

, когда вдруг Трубопроводу «запускает» и обрабатывает ВСЕ элементы DF, хранящиеся в Q.

Ничего не пробовал запускать планирование работы - кроме первого .val потребления

может объяснить, почему это так, я должен пропустить этот пункт здесь, но это «ничего не делать» до тех пор, пока первая запись не будет прочитана, это НЕ то, что я ожидал и сделал недействительной оценку размера (.iterator.size(), poll() и т. д.) на цель DataflowWriteChannel.

Я бы оценил любую помощь по этому вопросу - я боролся с этим в течение двух дней и не получил нигде. Я также просмотрел все тесты Gpars, и они просто называют .val столько же раз, сколько входы связаны - так что не показывайте проблему, которую я описал.

Вацлав Печ, или любой другой Gpars гуру, которые смотрят на вопросы, я хотел бы признателен за любую помощь понимание по этому вопросу, чтобы я над этим горб

относительно заранее

ответ

1

Небольшая модификация (добавление задержка) непосредственно перед утверждать, что размер 0 покажет, что вычисление инициируется письменной данных:

//layout pipeline 
sessionQ | evenPredicate | resultQ 
sleep 5000 
assert resultQ.iterator().size() == 0 
+0

спасибо Вацлав, я буду на поезде утром, может быть время, чтобы позволить усадьбу " догнать ", я уясню свои проблемы. Я пробовал несколько атак и выглядел так, как будто работа не обрабатывалась, как ожидалось, - сообщайте о результатах на различных «попытках», которые пытались увидеть, и посмотрел, не делает ли это трюк –

+0

я добавил промежуток сна 500 мс, и примеры начали искать право, и работа в других потоках имеет шанс сделать там материал на заднем плане. Я все еще не уверен, что «полное» состояние на конвейере защищается, хотя –

+0

Полное состояние ограничивает построитель Pipeline только для создания допустимых конвейеров. Трубопровод считается завершенным, когда вы больше не можете добавлять к нему операторов. Например, вызывая binaryChoice(), два канала подключаются к концу конвейера, и все дальнейшие операции с конвейером должны быть добавлены к любому из этих каналов, а не к исходному конвейеру. –