Python多处理管道的异常行为



我有两个进程,其中on运行主循环,另一个通过websocketapi获取最新价格,这两个进程可以与管道对象通信。所以我的核心进程会向另一个进程发送一条请求消息,然后会收到一条包含最新价格的消息。另一件事是,除非websocketapi进程做出响应,否则程序将不得不停止(最好是这样(,这会带来以下问题。一旦程序运行了几个小时,它就会在core发送请求并等待消息发送回来后挂起/冻结,并且一直保持这种状态。由于我的代码很长,我创建了一个示例代码,你可以在下面找到,我也没有添加websocket代码,因为它与这里无关。

from time import sleep
from multiprocessing import Pipe, Process

class websocket_stuff:
def __init__(self, pipe: Pipe):
self.process_pipe = pipe
# Websocket functions are not added since its not necessary for demonstration
def WS_run_forever(self):
# here i do the WS infinite loop, and as part of the loop i do the following:
while True:
if self.process_pipe.poll() is True:
rec = self.process_pipe.recv()
if (rec is not None) and (isinstance(rec, dict) is True):
if ('request' in rec) and (rec['request'] == 'candle'):
self.process_pipe.send(self.process_pipe, {'message': 'candle', 'ticker': rec['ticker'],
'data': 'OHLC data'})

class core:
def __init__(self):
self.name = 'ticker'
self.WS_pipe, kid = Pipe()
self.WS_obj = websocket_stuff(kid)
self.WS_process = Process(target=self.WS_obj.WS_run_forever)
self.WS_process.start()
def get_data(self):
self.WS_pipe.send({'request': 'candle', 'ticker': self.name})
twe = 0
while True:
# It hangs/freezes here
if self.WS_pipe.poll(2) is True:
rec = self.WS_pipe.recv()
break
else:
twe += 1
if twe >= 3:
self.WS_pipe.send({'request': 'candle', 'ticker': self.name})
sleep(0.0001)
if ('message' in rec) and (rec['message'] == 'candle') and (rec['ticker'] == self.name):
return rec['data']
return None

if __name__ == '__main__':
c = core()
while True:
data = c.get_data()
print(data)
sleep(1)

我检查了进程,看起来它们仍然在运行,我也没有在进程上得到任何异常,所以这不可能是错误。在跟踪该问题时,我发现在主进程发送有问题的管道后,另一个进程没有收到它请求的数据,因此出现了问题!我唯一能想到的就是管子出了问题。有人能帮我弄清楚发生了什么事吗?

它看起来像"不需要websockets的东西";是处理停止的地方,这将是帮助您的理想点-不需要它们完成-但.send行中的固定值没有帮助:您可以编写一个逐步结束的方法,在该方法中,您可以使用正在使用的I/O方法检索值。

除此之外,还可以在父进程中添加一些控制,如果超时(比如20秒-1分钟(,只需终止网络进程并生成一个新进程。实际上,这可能比尝试在子进程中重新尝试websocket方法更简单——只需在父进程中添加一些时间核算变量即可:

from time import sleep
from multiprocessing import Pipe, Process , terminate
(...)
import time
timeout = 20
class core:
def __init__(self):
self.name = 'ticker'
self.WS_process = None
self.reset_child()

def reset_child(self):
if self.WS_process:
terminate(self.WS_process)
self.WS_pipe, kid = Pipe()
self.WS_obj = websocket_stuff(kid)
self.WS_process = Process(target=self.WS_obj.WS_run_forever)
self.WS_process.start() 

def get_data(self):
self.WS_pipe.send({'request': 'candle', 'ticker': self.name})
twe = 0
last_rec = time.time()
last_sent_data = None
while True:
# It hangs/freezes here
if self.WS_pipe.poll(2):
rec = self.WS_pipe.recv()
break
else:
twe += 1
if twe >= 3:
self.WS_pipe.send(last_sent_data:=({'request': 'candle', 'ticker': self.name}))
sleep(0.0001)
if time.time() - last_rec > timeout:
self.reset_child()
self.WS_pipe.send(last_sent_data)
last_rec = time.time()
if ('message' in rec) and (rec['message'] == 'candle') and (rec['ticker'] == self.name):
return rec['data']
return None

if __name__ == '__main__':
c = core()
while True:
data = c.get_data()
print(data)
sleep(1)

(请注意,在检查条件时,不需要使用is True:如果表达式无论如何都是truthy,则if语句将执行,并且"is True"不仅是多余的:如果表达式导致其他有效的"truthy"值,如非零数或包含内容的序列,则可能会失败。(

最新更新