我需要并行处理一些多次完成的计算。因此,子进程python函数必须在两次调用之间保持活动状态。
在一个完美的世界里,我需要这样的东西:
class Computer:
def __init__(self, x):
self.x = x
# Creation of quite heavy python objects that cannot be pickled !!
def call(self, y):
return x+y
process = Computer(4) ## NEED MAGIC HERE to keep "call" alive in a subprocess !!
print(process.call(1)) # prints 5 (=4+1)
print(process.call(12)) # prints 16 (=4+12)
我可以遵循这个答案并通过asyncio.subprocess.PIPE
进行通信,但在我的实际用例中,
call
参数是整数列表call
答案是字符串列表
因此,避免手动序列化/反序列化参数和返回值可能很酷。
关于如何保持函数CCD_ 4〃的任何想法;"活着";准备好接听新电话了吗?
这里有一个基于这个的答案,但
- 创建了几个子流程
- 每个子流程都有自己的标识符
- 他们的调用是并行的
- 一个允许交换json而不是纯字节字符串的小层
你好。py
#!/usr/bin/python3
# This is the taks to be done.
# A task consist in receiving a json assumed to be
# {"vector": [...]}
# and return a json with the length of the vector and
# the worker id.
import sys
import time
import json
ident = sys.argv[1]
while True:
str_data = input()
data = json.loads(str_data)
command = data.get("command", None)
if command == "quit":
answer = {"comment": "I'm leaving",
"my id": ident}
print(json.dumps(answer), end="n")
sys.exit(1)
time.sleep(1) # simulates 1s of heavy work
answer = {"size": len(data['vector']),
"my id": ident}
print(json.dumps(answer), end="n")
main.py
#!/usr/bin/python3
import json
from subprocess import Popen, PIPE
import concurrent.futures
from concurrent.futures import ThreadPoolExecutor
dprint = print
def create_proc(arg):
cmd = ["./hello.py", arg]
process = Popen(cmd, stdin=PIPE, stdout=PIPE)
return process
def make_call(proc, arg):
"""Make the call in a thread."""
str_arg = json.dumps(arg)
txt = bytes(str_arg + 'n', encoding='utf8')
proc.stdin.write(txt)
proc.stdin.flush()
b_ans = proc.stdout.readline()
s_ans = b_ans.decode('utf8')
j_ans = json.loads(s_ans)
return j_ans
def search(executor, procs, data):
jobs = [executor.submit(make_call, proc, data) for proc in procs]
answer = []
for job in concurrent.futures.as_completed(jobs):
got_ans = job.result()
answer.append(got_ans)
return answer
def main():
n_workers = 50
idents = [f"{i}st" for i in range(0, n_workers)]
executor = ThreadPoolExecutor(n_workers)
# Create `n_workers` subprocesses waiting for data to work with.
# The subprocesses are all different because they receive different
# "initialization" id.
procs = [create_proc(ident) for ident in idents]
data = {"vector": [1, 2, 23]}
answers = search(executor, procs, data) # takes 1s instead of 5 !
for answer in answers:
print(answers)
search(executor, procs, {"command": "quit"})
main()