查看Gpars数据流/管道,但我不了解
如果你看下面的例子(我已经用运算符、piplines、chainWith完成了这项工作,并遇到了同样的问题)。
在这个例子中,我使用了任务,但也可以很容易地不使用,并且同样的问题也会显现出来。在本例中,我设置了两个DataflowQueues,一个用于初始条件,另一个用于根据谓词求值的结果。然后,我布局了一个管道,根据谓词的输入来评估输入(甚至是测试),并将结果存储在输出结果队列中
设置好管道并将一些条目发布到第一个队列后,我相信这些条目会在数据可用时进行处理(这对操作员版本也不起作用),正如你所看到的,在我将条目写入会话Q后,我测试了resultQ的大小为零(如果我删除了仍然为true的任务)。因此,写入数据不会"触发"处理。
第一个任务将多个条目保存到队列中。
import groovyx.gpars.dataflow.Dataflow
import groovyx.gpars.dataflow.DataflowQueue
import groovyx.gpars.dataflow.DataflowVariable
import groovyx.gpars.dataflow.Promise
/**
* Created by will on 13/01/2017.
*/
def iValues = [1,2,3,4,5]
DataflowQueue sessionQ = new DataflowQueue()
DataflowQueue resultQ = new DataflowQueue()
Dataflow.task {
println "setup task: set initial conditions list for rule predicate "
iValues.each {sessionQ << it}
}
Closure evenPredicate = {it %2 == 0}
//layout pipeline
sessionQ | evenPredicate | resultQ
assert resultQ.iterator().size() == 0
Promise ans = Dataflow.task {
println "result task : get three values from result q "
def outlist = []
3.times {
def res = resultQ.val
println "got result $res"
outlist << res
}
assert sessionQ.iterator().size() == 0
assert resultQ.iterator().size() == 2
outlist
}
println "ans list is $ans.val"
assert resultQ.iterator().size() == 2
只有在第二个任务/chainWith等中,在第二队列上调用.val(或get()),引擎才会开始运行,所有条目都会从第一个队列中处理,结果绑定到resultQ。
您可以从断言中看到这一点,因为一旦进行了第一个触发器(.val)同步调用,引擎就会运行并处理启动会话Q中的所有绑定条目。
这是一个问题,因为在您运行第一个.val调用之前-如果您执行poll()或resultQ.interator.size(),例如它是空的且未绑定,则size()=0。所以你不能写
for (dfRes in resultQ) {//do something with dfRes}
因为它总是空的,直到您使用会话Q中的第一个项目。我不明白为什么?在条目被绑定到第一个数据流队列后,我认为这些条目会在可用(被绑定)时被消耗掉,但事实并非如此。
这现在很棘手,因为你无法通过条目,检查结果的大小,对resultQ执行poll(),因为它将失败,直到从sessionQ读取第一个DF。
我最终不得不使用初始值数组的大小(告诉我保存到队列中的条目)作为ONLY,这意味着从resultQ中读取相同的数字以清空它(在上面的内容中,我只消耗了resultsQ中的3条记录,断言显示resultQ中仍有2条记录(但只有在第一次调用.val之后,如果您注释该行,则所有断言都开始失败)
我用Dataflow.operator、Pipeline等尝试过,但也遇到了同样的问题。为什么工作没有得到处理,因为每个输入都绑定到SessionQ?
最后,在Pipeline的情况下,有一个.complete()方法,如果您处理管道中的闭包{},它将保持打开状态(!complete()),但当您运行像.binaryChoice()这样的方法时,它会将管道标记为完成,并且不能添加进一步的操作。为什么要这样做?
当然,我不明白那个状态在说什么(不会再进行处理),如果你试图在这样的方法之后再执行一步,就会抛出异常。
不管怎样,我试过像这样的流水线
Pipeline pipeLine = new Pipeline(Q)
pipeLine.tap(log).binaryChoice(evenPathQ, oddPathQ) {println "$it %2 is ${it%2 ==0}"; (it%2 == 0) }
然而,当你将值绑定到Q时,什么都不会发生——直到你消耗了像这样的输出
odd.val
当管道突然"运行"并处理Q.中存储的所有DF项目时
除了第一个.val消耗之外,我没有尝试过启动工作的时间表
任何人都能解释为什么会这样吗?我一定错过了这里的要点,但在读取第一个条目之前,这种"什么都不做"不是我所期望的,并使对DataflowWriteChannel目标的任何大小评估(.iterator.size()、poll()等)类型调用无效。
我非常感谢在这方面的任何帮助——我已经为此挣扎了两天,但一无所获。我也查看了所有的Gpars测试,它们调用.val的次数与绑定输入的次数相同,所以不要显示我描述的问题。
在断言大小为0之前进行一次小的修改(添加延迟)将表明计算是由写入的数据触发的:
//layout pipeline
sessionQ | evenPredicate | resultQ
sleep 5000
assert resultQ.iterator().size() == 0