我有以下循环。我简化了代码。inner((在类似的循环中解析同一个文件,当然不需要.remote((调用
def outer(self,file):
rv = []
with open(file,'r') as f :
acc1, acc2 = [],[]
for i,line in enumerate(f) :
if i % 10 == 0 : print(f'> {i} ', end="n")
if i > 25 : break
outeri,txt = line.split(':')
abc = ClassX.inner.remote(txt, file, int(outeri))
acc2.append(abc) #lst of obj-refs
acc1.append(int(outeri))
rv = [z for z in zip(acc1, ray.get([a for a in acc2])) ]
return rv
我想异步地将数据收集到rv中,我在这里这样做,但没有中介"acc2"。
我有两个问题:
相反/除了收集数据外,我还想异步执行一些SQL代码,但随着结果的出现。
print((进度不是逐步打印的,而是在最后一次打印。我必须把它移到";inner((";
试图理解并行迭代器,但似乎很难/不可行如何在readline之后将步骤压缩到.remote((调用
编辑:根据澄清更改答案。
为了按对象引用到达的顺序处理对象引用,您需要在对象引用返回时使用ray.wait
来获取对象引用,然后只对准备好的对象引用调用ray.get
。
def outer(file):
outer_is = {}
unfinished_refs = []
with open(file, "r"):
for i, line in enumerate(f):
outeri, txt = line.split(":")
ref = ClassX.inner.remote(txt, file, int(outeri))
outer_is[ref] = int(outeri)
unfinished_refs.append(ref)
# This part will get and process tasks as they finish
while len(unfinished_refs) > 0:
finished, unfinished_refs = ray.wait(unfinished_refs)
outeri = outer_is[finished[0]]
result = ray.get(finished[0])
### Process the result here ###