gRPC 对象创建不会关闭的额外进程



我用gRPC服务器开发了这个应用程序。我的申请要点是:

  1. gRPC Servicer(下面的代码中的类 DexFxServicer(具有由 gRPC 客户端在外部调用的传输方法。

  2. 传输方法从 hostList 为不同的主机创建多个通道和存根。

  3. 进一步的应用程序创建进程池并启动它。

  4. 每个子进程为其自己的存根调用 gRPC 方法 SendHostListAndGetMetrics,并接收响应迭代器。

此代码运行良好,应用程序调用 Transmission 方法并从进程池接收所有需要的结果。但我注意到,当外部 gRPC 客户端多次调用传输方法时,此代码没有关闭其某些子进程。它导致额外的非封闭式流程创建,如 htop 所示。 当我尝试通过 channel.close(( 方法关闭 gRPC 通道时,正在更密集地创建额外的进程。

蟒蛇 2.7.12 grpcio==1.16.1 grpcio-tools==1.16.1 优麒麟 16.04.6 LTS 4.4.0-143-通用

from concurrent import futures
import sleep
import grpc
import sys
import cascade_pb2
import cascade_pb2_grpc
import metrics_pb2
import metrics_pb2_grpc
from multiprocessing import Pool
class DexFxServicer(cascade_pb2_grpc.DexFxServicer):
def __init__(self, args):
self.args = args
def Transmit(self, request, context):
entrypoint = request.sender.host_address # entrypoint is a string
hostList = []   # hostList is a list of strings
for rec in request.sender.receiver:
hostList.append(rec.host_address)
channels = {}
stubs = {}
for host in hostList:
try:
channels[host] = grpc.insecure_channel('%s:%d' % (host, self.args.cascadePort))
except Exception as e:
print(e)
sys.exit(0)
else:
stubs[host] = metrics_pb2_grpc.MetricsStub(channels[host])
def collect_metrics(host):
mtrx = []
hosts = (metrics_pb2.Host(hostname = i) for i in hostList + [entrypoint])
for i in stubs[host].SendHostListAndGetMetrics(hosts):
mtrx.append(i.mtrx)
return mtrx
pool = Pool(len(hostList))
results = pool.map(collect_metrics, hostList)
pool.close()
pool.terminate()
pool.join()
# Return the iterator of the results

我希望看到不会创建额外非关闭进程的代码。请告诉我在这种情况下该怎么做。

该问题已通过将 grpcio 版本更新到 1.23.0 得到解决。 gRPC 问题

相关内容

  • 没有找到相关文章

最新更新