Python并行运行多个程序



假设我有数千个传感器读数被添加到像Redis或Apache Kafka这样的队列中,foreach sensor_id可以获得自己的工作程序,该工作程序对历史读数进行计算。每个sensor_id将被发布到一个新的主题,然后开始流式传输读数。

每个工作程序都将从数据库中读取数据,以实例化其一些可变阈值。除了它的sensor_id和使用sensor _id作为查找关键字检索的变量阈值之外,EACH工作者的代码是相同的。一旦将sensor_id添加到队列中,就会为其分配一个永远运行的工作者。

这样做会更有效率吗:

  1. 多重访问。要启动数千个无限期运行的工作程序吗
  2. 或者";python script.py param1 param2"数千次,而只是更改params来实例化?(我会以编程方式生成一个bash脚本,以包含所有这些逻辑fyi(

我希望所有的工作程序尽可能快地并行运行,这些都是CPU限制的任务,而不是I/O限制的任务。发动所有工人的最佳方式是什么?

澄清:每个Sensor_id每秒都会生成传感器读数,因此对于3000个传感器,每秒会生成3000个事件,队列中有3000个主题。有效负载示例是JSON{sensor_id:"hash",温度:85,转速:1200,…}。我的理想设置是每个工作人员在内存中保持最后200个左右的读数来运行计算。另一种选择是使用一个中央队列,在该队列中,循环工人必须首先建立数据库连接,才能读取从队列中弹出的sensor_id的200个读数,但这需要时间。

我的理解是,Redis列表比发布/订阅更可靠,当单个消息需要由多个消费者消费时,发布/订阅更加适合。如果所有传感器都写入同一个列表,然后您可以让一个由相同工人组成的处理池循环读取该列表,那么您的应用程序也会大大简化。该消息将自然地识别所涉及的传感器,并且当工作人员读取其以前看到的新传感器id的消息时;第一次";通过从数据库中读取相关信息并将其保存在由传感器id键入的字典中来初始化该传感器。最终,该字典可能会有3000个条目这导致了这样一个结论,即池应该初始化一次,其中包含所有工作人员在开始读取消息之前都可以访问的所有3000个条目的字典

但是,如果出于任何原因,所有3000个传感器都必须写入3000个不同的Redis列表(如果一开始就使用Redis列表(,则可以使用以下代码。然后,这个想法将是找到一种以某种方式阅读";同时";从3000个列表中检索可用的消息,并将它们写入工作人员可以从中读取的单个队列,从而简化了工作人员逻辑。此代码基于12.13。David Beazley和Brian K.Jones的Python Cookbook第三版中的Polling Multiple Thread Queues可在此处获得。这已适用于轮询3000个Redis列表,并将读取的项目发送到单个multiprocessing.Queue实例。该代码还包括一个producer工作人员的处理池,模拟传感器读数的创建,这些读数被添加到3000个列表中的一个列表中,而在这里的实际代码中是没有的。但是,3000个PollableList实例必须能够访问负责获取传感器读数的任何代码。我们在这里也只有一个Process从消息队列中读取,这些消息是对象,并打印它们,以便保持打印";秩序井然"事实上,正如我所提到的,你会有一个过程库。

不幸的是,Cookbook中描述的技术似乎对传递给select的列表的大小有限制,因此为了安全起见,我不得不将列表大小限制为500,这意味着我必须将3000个Redis列表分解为6组,每组500个列表。

import redis
import socket
import os
import json
import select
from multiprocessing import Process, Queue
from multiprocessing.pool import ThreadPool
from functools import partial
class PollableList():
r = redis.Redis()
def __init__(self, idx):
self.idx = idx
self.list_name = f'sensor_{idx}'
# Create a pair of connected sockets
if os.name == 'posix':
self._putsocket, self._getsocket = socket.socketpair()
else:
# Compatibility on non-POSIX systems
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.bind(('127.0.0.1', 0))
server.listen(1)
self._putsocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self._putsocket.connect(server.getsockname())
self._getsocket, _ = server.accept()
server.close()
def fileno(self):
return self._getsocket.fileno()
def put(self, item):
PollableList.r.rpush(self.list_name, json.dumps(item))
self._putsocket.send(b'x')
def get(self):
self._getsocket.recv(1)
return json.loads(PollableList.r.lpop(self.list_name).decode())
def producer(lists):
""" emulate the 3000 sensors """
# Feed data to the lists (we won't run indefinitely)
for _ in range(3):
for lst in lists:
lst.put({'id': lst.idx, 'value': lst.idx * 100 + 1})
def consumer(q, list_names):
'''
Consumer that reads data on multiple lists simultaneously
'''
while True:
can_read, _, _ = select.select(list_names, [], [])
for r in can_read:
item = r.get()
q.put(item)
# in actual use case, there would be a pool of workers:
def worker(q):
message_number = 0
while True:
item = q.get()
message_number += 1
print(message_number, item)

def main():
lists = [PollableList(i) for i in range(0, 3000)]
# select cannot handle all 3000 sockets at once:
lists1 = lists[0:500]
lists2 = lists[500:1000]
lists3 = lists[1000:1500]
lists4 = lists[1500:2000]
lists5 = lists[2000:2500]
lists6 = lists[2500:3000]
p0 = Process(target=producer, args=(lists,))
p0.daemon = True
p0.start()
q = Queue()
thread_pool = ThreadPool(6)
thread_pool.map_async(partial(consumer, q), [lists1, lists2, lists3, lists4, lists5, lists6])
# This would in reality be a process pool of workers reading from q:
p1 = Process(target=worker, args=(q,))
p1.daemon = True
p1.start()
# wait for all 9000 messages to be displayed by worker:
input('Hit enter to terminate...n')
# required for Windows:
if __name__ == '__main__':
main()

相关内容

  • 没有找到相关文章

最新更新