使用 python 多处理从 mongodb 读取和删除



我有一个mongodb,它有一个集合incoming和一个集合target。工作进程当前正在执行以下操作(简化(:

def worker(number):
incomings = db.incoming.find()
buffersize=5
readcounter=0
for incoming in incomings:
readcounter+=1
documentToInsert={'keyfield':incoming["keyfield"], +other fields after some treatments...}
documentsToInsert.append(incoming)
documentToDelete={'_id':incoming["_id"]}
documentsToDelete.append(documentToDelete)
if readcounter >= readbuffer:
readcounter=0
db.incoming.remove({'_id': { '$in': [ObjectId(docs["_id"]) for docs in documentsToDelete]}})
db.target.insert_many([docs for docs in documentsToInsert],ordered=False)

当然,remove和insert_many语句被try/except包围。

由于数据输入的速度比/one工作程序处理的速度快,我需要变得更快,例如通过在所有cpu上生成数据,这无论如何都应该发生,以提高效率。我通过以下代码做到这一点:

if __name__== "__main__":
procs=[]
number=0
for cpu in range(multiprocessing.cpu_count()):
procs.append(multiprocessing.Process(target = worker, args = (number,)))
number+=1
for proc in procs:
proc.start()
for proc in procs:
proc.join()
print("=====================FIN=========================")

问题是,当一个线程读取buffersize文档时,其他线程获得相同的文档,导致只有一个线程成功插入target,其他线程产生重复的密钥异常。这种效果只会使一个过程变得有用。在没有多线程的情况下,remove/insert_many组合可以很好地工作,我可以很容易地使用更高的缓冲区。

我曾想过在incoming中插入带有额外字段的数据,以确定工作人员number的资格,但这会占用额外的磁盘空间,并占用额外的处理,此外,在生成数据时,我不知道有多少工作人员将处理数据。

我已经尝试过在每个线程中随机休眠一段时间,但这是完全不可预测的,并且不能防止错误本身

我可以做些什么来使所有线程处理不同的数据?

根据我的评论,我认为使用RabbitMQ之类的消息代理最适合您的用例。使用RabbitMQ和类似的消息代理程序(我没有使用0mq(,您不需要为其他线程提供消息,只需启动所需的线程数量,每个线程都订阅,代理程序将依次发送消息。

感谢@Belly-Buster提出使用*MQ解耦处理的想法。我已经通过使用ZeroMQ解决了这个问题,它是无代理的,但在本例中,我实现了一个负载平衡代理,它基于ZeroMQ的负载平衡代理示例。客户端正在从数据库中读取数据,而工作人员正在处理他们通过ZeroMQ获得的条目。我试图在代码中加入一些全面的注释,以便明确几点。该代码缺少一些我编写的实用程序类,这些类不属于该解决方案的一部分;这段代码只是为了回答这个问题,希望任何人都觉得它有用。

"""
Original Author: Brandon Carpenter (hashstat) <brandon(dot)carpenter(at)pnnl(dot)gov>
This code was part of the ZeroMQ Tutorial and implements the Load-balancing broker pattern.
Modified by @https://stackoverflow.com/users/2226028/michael
"""
from __future__ import print_function
import multiprocessing
import zmq
import io
import pymongo
from pymongo import MongoClient
import time
from pprint import pprint
import ast
import json
from bson.json_util import dumps
from datetime import datetime
from PairConfig import PairConfig
from PairController import PairController
import ctypes
import sys
from random import randint
NBR_CLIENTS = 1
NBR_WORKERS = 3
# Load the configuration file
# this is a configuration class which is not documented here
pairConfig=PairConfig("verify.ini")
# multiprocessing shared variables setup
manager = multiprocessing.Manager()
insertbuffer=manager.list()
deletebuffer=manager.list()
totalcounter=multiprocessing.Value(ctypes.c_int,0)
def client_task(ident):
try:
"""Basic request-reply client using REQ socket."""
client = MongoClient(pairConfig.config.get('db','url'))
db=client.databasename
socket = zmq.Context().socket(zmq.REQ)
socket.identity = u"Client-{}".format(ident).encode("ascii")
socket.connect("ipc://frontend.ipc")
while True:
incomings = db.incoming.find()
# this makes it safe(r) to run this on different nodes
incomings.skip(randint(randint(1,500),randint(5000,500000)))
for incoming in incomings:
pair = {'primarykey' : incoming["primarykey"], 'value' : incoming["value"]}
# Send request, get reply
socket.send_string(b"%s" % pair)
reply = socket.recv()
except KeyboardInterrupt:
print("nexit client")
def worker_task(ident,insertbuffer,deletebuffer,mylock):
try:
"""Worker task, using a REQ socket to do load-balancing."""
socket = zmq.Context().socket(zmq.REQ)
socket.identity = u"Worker-{}".format(ident).encode("ascii")
socket.connect("ipc://backend.ipc")
socket.send(b"READY")
# this is a helper class which is not documented here
pairController=PairController(pairConfig)
while True:
address, empty, request = socket.recv_multipart()
with totalcounter.get_lock():
totalcounter.value+=1
dictToInsert = ast.literal_eval(request.encode("ascii"))
dictToInsert["last_checked"]=datetime.now()
insertbuffer.append(dictToInsert)
deletebuffer.append(dictToInsert["primarykey"])
# ... do some timely treatment here - a lot of variable time gets burned here ...
# result will be result1 and result2, for the sake of simplification I will fill it with random numbers here
result1=randint(1,10)
result2=randint(1,10)
sys.stdout.write("%s %s insertbuffer: %d, deletebuffer: %d, totalcounter: %d, b: %s, r: %s            r" % (socket.identity.decode("ascii"),dictToInsert["primarykey"],len(insertbuffer),len(deletebuffer),totalcounter.value,result1,result2))
sys.stdout.flush()
# readbuffer comes from an ini file ... I chose 500 for now
if len(insertbuffer[:]) >= int(pairConfig.config.get('verify','readbuffer')) and ident==0:
mylock.acquire()
# these 2 methods are inside a class pairController which is not documented here,
# it's basically one method for insert_many() and one method for remove(), 
# each time with the respective buffer as a filter
pairController.storePairs("history",insertbuffer[:])
pairController.deletePairs("history",deletebuffer[:])
# this empties the buffers for all filters:
insertbuffer[:]=[]
deletebuffer[:]=[]
mylock.release()
socket.send_multipart([address, b"", b"ok"])
except KeyboardInterrupt:
print("nexit worker")
def main():
"""Load balancer main loop."""
# Prepare context and sockets
context = zmq.Context.instance()
frontend = context.socket(zmq.ROUTER)
frontend.bind("ipc://frontend.ipc")
backend = context.socket(zmq.ROUTER)
backend.bind("ipc://backend.ipc")
# Start background tasks
mylock = multiprocessing.Lock()
def start(task, *args):
process = multiprocessing.Process(target=task, args=args)
process.daemon = True
process.start()
for i in range(NBR_CLIENTS):
start(client_task, i)
for i in range(NBR_WORKERS):
start(worker_task, i, insertbuffer, deletebuffer, mylock)
# Initialize main loop state
count = NBR_CLIENTS
workers = []
poller = zmq.Poller()
# Only poll for requests from backend until workers are available
poller.register(backend, zmq.POLLIN)
while True:
sockets = dict(poller.poll())
if backend in sockets:
# Handle worker activity on the backend
request = backend.recv_multipart()
worker, empty, client = request[:3]
if not workers:
# Poll for clients now that a worker is available
poller.register(frontend, zmq.POLLIN)
workers.append(worker)
if client != b"READY" and len(request) > 3:
# If client reply, send rest back to frontend
empty, reply = request[3:]
frontend.send_multipart([client, b"", reply])
count -= 1
if frontend in sockets:
# Get next client request, route to last-used worker
client, empty, request = frontend.recv_multipart()
worker = workers.pop(0)
backend.send_multipart([worker, b"", client, b"", request])
if not workers:
# Don't poll clients if no workers are available
poller.unregister(frontend)
# Clean up
backend.close()
frontend.close()
context.term()
if __name__ == "__main__":
try:
main()
except KeyboardInterrupt:
print("nexit main")

最新更新