异步get()不带中间累加器



我有以下循环。我简化了代码。inner((在类似的循环中解析同一个文件,当然不需要.remote((调用

def outer(self,file):
rv = []
with open(file,'r') as f :
acc1, acc2 = [],[]
for i,line in  enumerate(f) :
if i % 10 == 0 : print(f'> {i}  ', end="n")
if i > 25 : break
outeri,txt = line.split(':')
abc = ClassX.inner.remote(txt, file, int(outeri))
acc2.append(abc) #lst of obj-refs
acc1.append(int(outeri)) 
rv = [z for z in zip(acc1, ray.get([a for a in acc2])) ]
return rv

我想异步地将数据收集到rv中,我在这里这样做,但没有中介"acc2"。

我有两个问题:

  1. 相反/除了收集数据外,我还想异步执行一些SQL代码,但随着结果的出现。

  2. print((进度不是逐步打印的,而是在最后一次打印。我必须把它移到";inner((";

试图理解并行迭代器,但似乎很难/不可行如何在readline之后将步骤压缩到.remote((调用

编辑:根据澄清更改答案。

为了按对象引用到达的顺序处理对象引用,您需要在对象引用返回时使用ray.wait来获取对象引用,然后只对准备好的对象引用调用ray.get

def outer(file):
outer_is = {}
unfinished_refs = []
with open(file, "r"):
for i, line in enumerate(f):
outeri, txt = line.split(":")
ref = ClassX.inner.remote(txt, file, int(outeri))
outer_is[ref] = int(outeri)
unfinished_refs.append(ref)
# This part will get and process tasks as they finish
while len(unfinished_refs) > 0:
finished, unfinished_refs = ray.wait(unfinished_refs)
outeri = outer_is[finished[0]]
result = ray.get(finished[0])
### Process the result here ###

最新更新