Grails异步Promise/for循环错误-如何将变量传递给任务闭包



我有很多文件需要解析,所以我在几个线程上进行解析。

int fileCount = 16
def promiseList = []
for (int i = 1; i <= fileCount; i++) {
    println i
    def p = task {
        println "${new Date()} Starting parse of schedules (${i})..."
        // do some parsing here, where I need access to the value i     
    }
    p.onError {Throwable t ->
        println "Serious error when loading schedule ${i}, ${t.getMessage()}"
        t.printStackTrace()
    }
    promiseList << p
}
waitAll(promiseList)

这里的想法是创建多个promise,将它们全部异步运行,然后让它们全部完成。

Wed Feb 25 16:36:36 GMT 2015 Starting parse of schedules (1)...
Wed Feb 25 16:36:36 GMT 2015 Starting parse of schedules (3)...
Wed Feb 25 16:36:36 GMT 2015 Starting parse of schedules (4)...
Wed Feb 25 16:36:36 GMT 2015 Starting parse of schedules (5)...
Wed Feb 25 16:36:36 GMT 2015 Starting parse of schedules (6)...
Wed Feb 25 16:36:36 GMT 2015 Starting parse of schedules (7)...
Wed Feb 25 16:36:36 GMT 2015 Starting parse of schedules (8)...
Wed Feb 25 16:36:36 GMT 2015 Starting parse of schedules (9)...
Wed Feb 25 16:36:36 GMT 2015 Starting parse of schedules (10)...
Wed Feb 25 16:36:36 GMT 2015 Starting parse of schedules (11)...
Wed Feb 25 16:36:36 GMT 2015 Starting parse of schedules (13)...
Wed Feb 25 16:36:36 GMT 2015 Starting parse of schedules (12)...
Wed Feb 25 16:36:36 GMT 2015 Starting parse of schedules (14)...
Wed Feb 25 16:36:36 GMT 2015 Starting parse of schedules (15)...
Wed Feb 25 16:36:36 GMT 2015 Starting parse of schedules (16)...
Wed Feb 25 16:36:36 GMT 2015 Starting parse of schedules (17)...

问题:2号任务到底发生了什么?如果我放:

println i

进入for循环,则一切如预期(即,我得到1..16)。我认为变量正在增加,然后在稍后的某个点上,闭包查看变量,它已经增加了,因此值不正确。如何将值I传递给闭包,以便获得正确的值?

编辑:为了它的价值,我可以破解它,我真的不喜欢这样做,插入:

// for ()
    int taskNumber = i
    // create task here, refer to local variable taskNumber instead of i
    Thread.currentThread().sleep((long)(1000))

在for循环结束时,以便在下一个循环发生之前正确地实例化任务。然而,这是一个异常糟糕的破解,我相信一定有更好的方法将变量传递给线程?

进一步编辑:

以下是我使用实现for循环时的输出

for (i in 1..fileCount)
Wed Feb 25 16:57:54 GMT 2015 Starting parse of schedules (1)...
Wed Feb 25 16:57:54 GMT 2015 Starting parse of schedules (3)...
Wed Feb 25 16:57:54 GMT 2015 Starting parse of schedules (4)...
Wed Feb 25 16:57:54 GMT 2015 Starting parse of schedules (5)...
Wed Feb 25 16:57:54 GMT 2015 Starting parse of schedules (6)...
Wed Feb 25 16:57:54 GMT 2015 Starting parse of schedules (8)...
Wed Feb 25 16:57:54 GMT 2015 Starting parse of schedules (7)...
Wed Feb 25 16:57:54 GMT 2015 Starting parse of schedules (9)...
Wed Feb 25 16:57:54 GMT 2015 Starting parse of schedules (11)...
Wed Feb 25 16:57:54 GMT 2015 Starting parse of schedules (13)...
Wed Feb 25 16:57:54 GMT 2015 Starting parse of schedules (13)...
Wed Feb 25 16:57:54 GMT 2015 Starting parse of schedules (13)...
Wed Feb 25 16:57:54 GMT 2015 Starting parse of schedules (14)...
Wed Feb 25 16:57:54 GMT 2015 Starting parse of schedules (15)...
Wed Feb 25 16:57:54 GMT 2015 Starting parse of schedules (16)...
Wed Feb 25 16:57:54 GMT 2015 Starting parse of schedules (16)...

i当前在所有闭包/任务之间共享。在创建闭包的过程中,您需要将i的副本从当前上下文/当前线程传递到闭包本身。你可以通过使用currying来完成,比如:

def p = task({int locali -> 
    println "${new Date()} Starting parse of schedules (${locali})..."
    // do some parsing here, where I need access to the value i     
}.curry(i))
p.onError({int locali, Throwable t ->
    println "Serious error when loading schedule ${locali}, ${t.getMessage()}"
    t.printStackTrace()
}.curry(i))

另一种方法是从不同的上下文创建闭包(更多的Java方法,基本上就是collect破解的方法):

Closure createTask(final int i) {
  return {
    println "${new Date()} Starting parse of schedules (${i})..."
    // do some parsing here, where I need access to the value i     
  }
}

然后:

def p = task(createTask(i))

我有一个似乎有效的解决方案

,但我不确定这是否是偶然的

编辑:如果我在任务中添加了一个sleep(),正如@cfrick在问题评论中所建议的那样,如下所示,那么这些项目都会被处理(尽管顺序是随机的)。

for循环应替换为collect:

(1..fileCount).collect { i->
    def p = task {
        // this sleep() call is added to check if all tasks are run with the correct parameters
        Thread.currentThread().sleep((long)(1000))
        int taskNumber = i
        println "${new Date()} Starting parse of schedules (${taskNumber})..."
    }
    p.onError {Throwable t ->
        int taskNumber = i
        println "Serious error when loading schedule ${taskNumber}, ${t.getMessage()}"
        t.printStackTrace()
    }
    promiseList << p
}
waitAll(promiseList)

因此,根据@cfrick的建议,代码可以修改为更简单:

promiseList = (1..fileCount).collect { i->
    def p = task {
        println "${new Date()} Starting parse of schedules (${i})..."
        // do work here
    }
    p.onError {Throwable t ->
        println "Serious error when loading schedule ${i}, ${t.getMessage()}"
        t.printStackTrace()
    }
    return p
}
waitAll(promiseList)

最新更新