我用gRPC服务器开发了这个应用程序。我的申请要点是:
-
gRPC Servicer(下面的代码中的类 DexFxServicer(具有由 gRPC 客户端在外部调用的传输方法。
-
传输方法从 hostList 为不同的主机创建多个通道和存根。
-
进一步的应用程序创建进程池并启动它。
-
每个子进程为其自己的存根调用 gRPC 方法 SendHostListAndGetMetrics,并接收响应迭代器。
此代码运行良好,应用程序调用 Transmission 方法并从进程池接收所有需要的结果。但我注意到,当外部 gRPC 客户端多次调用传输方法时,此代码没有关闭其某些子进程。它导致额外的非封闭式流程创建,如 htop 所示。 当我尝试通过 channel.close(( 方法关闭 gRPC 通道时,正在更密集地创建额外的进程。
蟒蛇 2.7.12 grpcio==1.16.1 grpcio-tools==1.16.1 优麒麟 16.04.6 LTS 4.4.0-143-通用
from concurrent import futures
import sleep
import grpc
import sys
import cascade_pb2
import cascade_pb2_grpc
import metrics_pb2
import metrics_pb2_grpc
from multiprocessing import Pool
class DexFxServicer(cascade_pb2_grpc.DexFxServicer):
def __init__(self, args):
self.args = args
def Transmit(self, request, context):
entrypoint = request.sender.host_address # entrypoint is a string
hostList = [] # hostList is a list of strings
for rec in request.sender.receiver:
hostList.append(rec.host_address)
channels = {}
stubs = {}
for host in hostList:
try:
channels[host] = grpc.insecure_channel('%s:%d' % (host, self.args.cascadePort))
except Exception as e:
print(e)
sys.exit(0)
else:
stubs[host] = metrics_pb2_grpc.MetricsStub(channels[host])
def collect_metrics(host):
mtrx = []
hosts = (metrics_pb2.Host(hostname = i) for i in hostList + [entrypoint])
for i in stubs[host].SendHostListAndGetMetrics(hosts):
mtrx.append(i.mtrx)
return mtrx
pool = Pool(len(hostList))
results = pool.map(collect_metrics, hostList)
pool.close()
pool.terminate()
pool.join()
# Return the iterator of the results
我希望看到不会创建额外非关闭进程的代码。请告诉我在这种情况下该怎么做。
该问题已通过将 grpcio 版本更新到 1.23.0 得到解决。 gRPC 问题