如何使用Muultiprocessing订阅多个Websocket流



我是python中处理多处理、多线程等的新手。

我正在尝试使用multiprocessing从我的加密货币交易所(API Docs Here(订阅多个Websocket流。但是,当我运行下面的代码时,我只收到ticker information,而没有收到order book updates

如何修复代码以获得这两种信息
multiprocessing上运行时,似乎只有一个websocket在工作,这是什么原因

(当我分别运行函数ws_orderBookUpdates()ws_tickerInfo()时,不使用multiprocessing,它单独运行很好,所以这不是交易所的问题。(

import websocket
import json
import pprint
from datetime import datetime
import time
# Function to subscribe to ticker information.
def ws_tickerInfo():
def on_open(self):
print("opened")
subscribe_message = {
"method": "subscribe",
"params": {'channel': "lightning_ticker_BTC_JPY"}
}
ws.send(json.dumps(subscribe_message))
def on_message(self, message, prev=None):
print(f"Ticker Info, Received : {datetime.now()}")
###### full json payloads ######
# pprint.pprint(json.loads(message))
def on_close(self):
print("closed connection")
endpoint = 'wss://ws.lightstream.bitflyer.com/json-rpc'
ws = websocket.WebSocketApp(endpoint,
on_open=on_open,
on_message=on_message,
on_close=on_close)
ws.run_forever()

# Function to subscribe to order book updates.
def ws_orderBookUpdates():
def on_open(self):
print("opened")
subscribe_message = {
"method": "subscribe",
"params": {'channel': "lightning_board_BTC_JPY"}
}
ws.send(json.dumps(subscribe_message))
def on_message(self, message):
print(f"Order Book, Received : {datetime.now()}")
###### full json payloads ######
# pprint.pprint(json.loads(message))
def on_close(self):
print("closed connection")
endpoint = 'wss://ws.lightstream.bitflyer.com/json-rpc'
ws = websocket.WebSocketApp(endpoint,
on_open=on_open,
on_message=on_message,
on_close=on_close)
ws.run_forever()

# Multiprocessing two functions
if __name__ == '__main__':
import multiprocessing as mp
mp.Process(target=ws_tickerInfo(), daemon=True).start()
mp.Process(target=ws_orderBookUpdates(), daemon=True).start()

更新

您已经创建了两个守护进程。当所有非守护进程终止时,它们将终止,在本例中,这是主进程,在创建守护进程后立即终止。你很幸运,即使其中一个过程也有机会产生产出,但为什么要冒险呢不要使用dameon进程。相反:

if __name__ == '__main__':
import multiprocessing as mp
p1 = mp.Process(target=ws_tickerInfo)
p2 = mp.Process(target=ws_orderBookUpdates)
p1.start()
p2.start()
p1.join() # wait for completion
p2.join() # wait for completion

但真正的问题就在我们面前,我们都错过了!你有:

p1 = mp.Process(target=ws_tickerInfo(), daemon=True)
p2 = mp.Process(target=ws_orderBookUpdates(), daemon=True)

应该是什么时候:

p1 = mp.Process(target=ws_tickerInfo)
p2 = mp.Process(target=ws_orderBookUpdates)

看到区别了吗?实际上,您并没有将函数ws_tickerInfo传递给Process,而是调用ws_tickerInfo并试图传递返回值,如果函数返回(它没有返回(,则返回值将是无意义的None。因此,您甚至从未执行过第二个进程创建语句。

虽然Ctrl-C中断处理程序可能不起作用,但您可能也使用了多线程而不是多处理。还应该有一个终止程序的机制。我添加了一些代码来检测Ctrl-C,并在输入Ctrl-C时终止。此外,您使用了self作为函数参数,就好像该函数实际上是一个类方法一样,但事实并非如此。这不是一种好的编程风格。以下是更新来源:

import websocket
import json
import pprint
from datetime import datetime
import time
import sys
import signal
# Function to subscribe to ticker information.
def ws_tickerInfo():
def on_open(wsapp):
print("opened")
subscribe_message = {
"method": "subscribe",
"params": {'channel': "lightning_ticker_BTC_JPY"}
}
wsapp.send(json.dumps(subscribe_message))
def on_message(wsapp, message, prev=None):
print(f"Ticker Info, Received : {datetime.now()}")
###### full json payloads ######
# pprint.pprint(json.loads(message))
def on_close(wsapp):
print("closed connection")
endpoint = 'wss://ws.lightstream.bitflyer.com/json-rpc'
ws = websocket.WebSocketApp(endpoint,
on_open=on_open,
on_message=on_message,
on_close=on_close)
ws.run_forever()

# Function to subscribe to order book updates.
def ws_orderBookUpdates():
def on_open(wsapp):
print("opened")
subscribe_message = {
"method": "subscribe",
"params": {'channel': "lightning_board_BTC_JPY"}
}
wsapp.send(json.dumps(subscribe_message))
def on_message(wsapp, message):
print(f"Order Book, Received : {datetime.now()}")
###### full json payloads ######
# pprint.pprint(json.loads(message))
def on_close(wsapp):
print("closed connection")
endpoint = 'wss://ws.lightstream.bitflyer.com/json-rpc'
ws = websocket.WebSocketApp(endpoint,
on_open=on_open,
on_message=on_message,
on_close=on_close)
ws.run_forever()
def handle_ctrl_c(signum, stack_frame):
sys.exit(0)
if __name__ == '__main__':
import multiprocessing as mp
signal.signal(signal.SIGINT, handle_ctrl_c) # terminate on ctrl-c
print('Enter Ctrl-C to terminate.')
p1 = mp.Process(target=ws_tickerInfo)
p2 = mp.Process(target=ws_orderBookUpdates)
p1.start()
p2.start()
p1.join() # wait for completion (will never happen)
p2.join() # wait for completion (will never happen)

使用多线程

if __name__ == '__main__':
import threading
t1 = threading.Thread(target=ws_tickerInfo, daemon=True)
t2 = threading.Thread(target=ws_orderBookUpdates, daemon=True)
t1.start()
t2.start()
input('Hit enter to terminate...n')

相关内容

  • 没有找到相关文章

最新更新