在 Flask 上的应用程序中,我在一批文件中使用多处理 - 用户上传一个包含许多 pdf 文件的.zip - 上传后,在数据库上为每个文件创建一个新实体,然后启动一个线程并调用多处理池,以便每个文件启动一个与 Google 云服务(如 Google Storage 和 Google Datastore(交互的进程。
import threading
import multiprocessing
import sys
class ProcessMulti(threading.Thread):
def __init__(self, files_ids):
self.files_ids = files_ids
super().__init__()
def run(self):
with multiprocessing.Pool(processes=multiprocessing.cpu_count()) as pool:
for i, _ in enumerate(pool.imap_unordered(process_one, self.files_ids), 1):
sys.stderr.write('rdone {0:%}'.format(i/len(self.files_ids)))
def process_one(file_id):
print("Process started by {}".format(file_id))
file = File(file_id)
file.process()
print("Process finished by {}".format(file_id))
return file.id
在文件对象中,与 Google 数据存储和 Google Storage 有一些琐碎的交互 - 例如从存储桶中重新编辑文件或修改数据。一切都在当地顺利进行...但是在使用SSL连接的生产中,当尝试启动进程时,会抛出以下错误,并且根本没有发生任何事情:
Process started by 5377634535997440
E1004 15:49:32.711329522 32255 ssl_transport_security.cc:476] Corruption detected.
E1004 15:49:32.711356181 32255 ssl_transport_security.cc:452] error:100003fc:SSL routines:OPENSSL_internal:SSLV3_ALERT_BAD_RECORD_MAC
E1004 15:49:32.711361146 32255 secure_endpoint.cc:208] Decryption error: TSI_DATA_CORRUPTED
任何人都知道导致此错误的原因吗?我做了一些研究,发现了一些与SSL套接字过载相关的错误......但我不知道要解决这个问题或替代具有类似性能的多处理。谢谢。
对我们有用的另一种解决方案是在父进程中将GRPC_POLL_STRATEGY
设置为"轮询":
os.environ['GRPC_POLL_STRATEGY']='poll'
我们在Firebase中使用多线程时收到Decryption error: TSI_DATA_CORRUPTED
错误。
来源: https://github.com/grpc/grpc/issues/28557
我最终将多处理和线程操作交换到芹菜任务队列,因为在连接到 gcloud 服务时存在一些我无法克服的线程安全性问题。Celery 实现是我的应用程序上许多多个异步任务的良好解决方案。
#Import celery instance with app context already set
from main_app import celery
@celery.task
def process_one(file_id):
print("Process started by {}".format(file_id))
file = File(file_id)
file.process()
print("Process finished by {}".format(file_id))
return file.id