多次调用相同的子进程python函数



我需要并行处理一些多次完成的计算。因此,子进程python函数必须在两次调用之间保持活动状态。

在一个完美的世界里,我需要这样的东西:

class Computer:
def __init__(self, x):
self.x = x
# Creation of quite heavy python objects that cannot be pickled !!
def call(self, y):
return x+y

process =  Computer(4) ## NEED MAGIC HERE to keep "call" alive in a subprocess !!
print(process.call(1))   # prints 5   (=4+1)
print(process.call(12))  # prints 16 (=4+12)

我可以遵循这个答案并通过asyncio.subprocess.PIPE进行通信,但在我的实际用例中,

  • call参数是整数列表
  • call答案是字符串列表

因此,避免手动序列化/反序列化参数和返回值可能很酷。

关于如何保持函数CCD_ 4〃的任何想法;"活着";准备好接听新电话了吗?

这里有一个基于这个的答案,但

  • 创建了几个子流程
  • 每个子流程都有自己的标识符
  • 他们的调用是并行的
  • 一个允许交换json而不是纯字节字符串的小层

你好。py

#!/usr/bin/python3

# This is the taks to be done.
# A task consist in receiving a json assumed to be
# {"vector": [...]}
# and return a json with the length of the vector and
# the worker id.
import sys
import time
import json

ident = sys.argv[1]

while True:
str_data = input()
data = json.loads(str_data)
command = data.get("command", None)
if command == "quit":
answer = {"comment": "I'm leaving",
"my id": ident}
print(json.dumps(answer), end="n")
sys.exit(1)
time.sleep(1)   # simulates 1s of heavy work
answer = {"size": len(data['vector']),
"my id": ident}
print(json.dumps(answer), end="n")

main.py

#!/usr/bin/python3
import json
from subprocess import Popen, PIPE
import concurrent.futures
from concurrent.futures import ThreadPoolExecutor

dprint = print

def create_proc(arg):
cmd = ["./hello.py", arg]
process = Popen(cmd, stdin=PIPE, stdout=PIPE)
return process

def make_call(proc, arg):
"""Make the call in a thread."""
str_arg = json.dumps(arg)
txt = bytes(str_arg + 'n', encoding='utf8')
proc.stdin.write(txt)
proc.stdin.flush()
b_ans = proc.stdout.readline()
s_ans = b_ans.decode('utf8')
j_ans = json.loads(s_ans)
return j_ans

def search(executor, procs, data):
jobs = [executor.submit(make_call, proc, data) for proc in procs]
answer = []
for job in concurrent.futures.as_completed(jobs):
got_ans = job.result()
answer.append(got_ans)
return answer

def main():
n_workers = 50
idents = [f"{i}st" for i in range(0, n_workers)]
executor = ThreadPoolExecutor(n_workers)
# Create `n_workers` subprocesses waiting for data to work with.
# The subprocesses are all different because they receive different
# "initialization" id.
procs = [create_proc(ident) for ident in idents]
data = {"vector": [1, 2, 23]}
answers = search(executor, procs, data)  # takes 1s instead of 5 !
for answer in answers:
print(answers)
search(executor, procs, {"command": "quit"})

main()

最新更新