在 pika 消费者中运行多处理/多线程并将数据定向到特定数据帧



我是Python多线程/处理和RabbitMQ的新手。基本上我有一个RabbitMQ消费者,它可以为我提供实时的医院数据。每条消息都包含每位患者患者的生命体征。我需要为每个患者存储至少五条这样的消息,以便运行我的逻辑并用于触发警报。此外,由于患者数量未知,我正在考虑多线程或多处理,以保持我的警报几乎实时并扩大规模。我的方法是为每个患者创建一个全局数据帧,然后将与该患者相关的消息附加到数据帧中。但是现在我在创建多线程/进程并将数据发送到相应的患者数据帧时遇到问题。这是我的代码


bed_list=[]
thread_list=[]
bed_df={}
alarms=0
def spo2(body,bed):
body_data= body.decode()
print(body_data)
packet= json.loads(body_data)
bed_id= packet['beds'][0]['bedId']
if bed_id=bed:
primary_attributes= json_normalize(packet)
'''some logic'''
global bed_df
bed_df[bed_id]= bed_df[bed_id].append(packet) # creating the global dataframe to store five messages
print(bed_df[bed_id])
''' some other calcuation'''
phy_channel.basic_publish(body=json.dumps(truejson),exchange='nicu')# throwing out the alarm with another queue
bed_df[bed_id]= bed_df[bed_id].tail(4)  # resets the size of the dataframe 

def callback(ch, method, properties, body):
body_data= body.decode()
packet= json.loads(body_data)
bed_id= packet['beds'][0]['bedId']
print(bed_id)
global bed_list
if bed_id not in bed_list:
bed_list.append(bed_id)

#pseudo code
for bed in bed_list:
proc = Process(target=spo2, args=(bed,))
procs.append(proc)
proc.start()

我无法找到一种方法,我可以为每个患者创建一个线程/进程(bed_id(,以便每当我收到该bed_id患者的消息时,我都可以将其定向到该线程。我已经检查了队列,但文档对于实现这种情况不是很清楚。

在你走这条路之前,你应该评估一下这是否必要。一个重要的限制是 rabbitmq 带宽。

构建一个单线程应用程序,并开始向其提供合成的 rabbitmq 消息。增加味精/秒的速度,直到它再也跟不上。

如果这个比率比实际可能发生的要高得多,你就完成了。

如果没有,则开始分析应用程序,以查找应用程序的哪些部分花费的时间最多。这些是你的瓶颈。 只有当你知道瓶颈是什么时,你才能查看相关的代码并思考如何改进它们。

请注意,multiprocessingthreading执行不同的事情并具有不同的应用程序。如果您的应用程序受到它可以执行的计算量的限制,那么multiprocessing可以通过将计算分散到多个 CPU 内核来提供帮助。请注意,这仅在计算彼此独立时才有效。如果您的应用程序花费大量时间等待 I/O,threading可以帮助您在一个线程中执行计算,而另一个线程正在等待 I/O。

但就复杂性而言,两者都不是免费的。例如,对于threading,您必须使用锁保护数据帧的读取和写入,以便一次只有一个线程可以读取或修改所述数据帧。使用multiprocessing,您必须将数据从工作进程发送回父进程。

在这种情况下,我认为multiprocessing将是最有用的。您可以设置许多流程,每个流程负责部分床位/患者。如果 rabbitmq 可以有多个侦听器,则可以让每个工作进程仅处理来自它负责的患者的消息。否则,您必须将消息分发到适当的进程。现在,每个工作进程都处理许多患者的消息(并保留数据帧(。当根据对数据进行的计算触发警报时,工作人员只需向父进程发送一条消息,详细说明患者的标识符和警报的性质。

最新更新