我是python中处理多处理、多线程等的新手。
我正在尝试使用multiprocessing
从我的加密货币交易所(API Docs Here(订阅多个Websocket流。但是,当我运行下面的代码时,我只收到ticker information
,而没有收到order book updates
。
如何修复代码以获得这两种信息
在multiprocessing
上运行时,似乎只有一个websocket在工作,这是什么原因
(当我分别运行函数ws_orderBookUpdates()
和ws_tickerInfo()
时,不使用multiprocessing
,它单独运行很好,所以这不是交易所的问题。(
import websocket
import json
import pprint
from datetime import datetime
import time
# Function to subscribe to ticker information.
def ws_tickerInfo():
def on_open(self):
print("opened")
subscribe_message = {
"method": "subscribe",
"params": {'channel': "lightning_ticker_BTC_JPY"}
}
ws.send(json.dumps(subscribe_message))
def on_message(self, message, prev=None):
print(f"Ticker Info, Received : {datetime.now()}")
###### full json payloads ######
# pprint.pprint(json.loads(message))
def on_close(self):
print("closed connection")
endpoint = 'wss://ws.lightstream.bitflyer.com/json-rpc'
ws = websocket.WebSocketApp(endpoint,
on_open=on_open,
on_message=on_message,
on_close=on_close)
ws.run_forever()
# Function to subscribe to order book updates.
def ws_orderBookUpdates():
def on_open(self):
print("opened")
subscribe_message = {
"method": "subscribe",
"params": {'channel': "lightning_board_BTC_JPY"}
}
ws.send(json.dumps(subscribe_message))
def on_message(self, message):
print(f"Order Book, Received : {datetime.now()}")
###### full json payloads ######
# pprint.pprint(json.loads(message))
def on_close(self):
print("closed connection")
endpoint = 'wss://ws.lightstream.bitflyer.com/json-rpc'
ws = websocket.WebSocketApp(endpoint,
on_open=on_open,
on_message=on_message,
on_close=on_close)
ws.run_forever()
# Multiprocessing two functions
if __name__ == '__main__':
import multiprocessing as mp
mp.Process(target=ws_tickerInfo(), daemon=True).start()
mp.Process(target=ws_orderBookUpdates(), daemon=True).start()
更新
您已经创建了两个守护进程。当所有非守护进程终止时,它们将终止,在本例中,这是主进程,在创建守护进程后立即终止。你很幸运,即使其中一个过程也有机会产生产出,但为什么要冒险呢不要使用dameon进程。相反:
if __name__ == '__main__':
import multiprocessing as mp
p1 = mp.Process(target=ws_tickerInfo)
p2 = mp.Process(target=ws_orderBookUpdates)
p1.start()
p2.start()
p1.join() # wait for completion
p2.join() # wait for completion
但真正的问题就在我们面前,我们都错过了!你有:
p1 = mp.Process(target=ws_tickerInfo(), daemon=True)
p2 = mp.Process(target=ws_orderBookUpdates(), daemon=True)
应该是什么时候:
p1 = mp.Process(target=ws_tickerInfo)
p2 = mp.Process(target=ws_orderBookUpdates)
看到区别了吗?实际上,您并没有将函数ws_tickerInfo
传递给Process
,而是调用ws_tickerInfo
并试图传递返回值,如果函数返回(它没有返回(,则返回值将是无意义的None
。因此,您甚至从未执行过第二个进程创建语句。
虽然Ctrl-C中断处理程序可能不起作用,但您可能也使用了多线程而不是多处理。还应该有一个终止程序的机制。我添加了一些代码来检测Ctrl-C,并在输入Ctrl-C时终止。此外,您使用了self
作为函数参数,就好像该函数实际上是一个类方法一样,但事实并非如此。这不是一种好的编程风格。以下是更新来源:
import websocket
import json
import pprint
from datetime import datetime
import time
import sys
import signal
# Function to subscribe to ticker information.
def ws_tickerInfo():
def on_open(wsapp):
print("opened")
subscribe_message = {
"method": "subscribe",
"params": {'channel': "lightning_ticker_BTC_JPY"}
}
wsapp.send(json.dumps(subscribe_message))
def on_message(wsapp, message, prev=None):
print(f"Ticker Info, Received : {datetime.now()}")
###### full json payloads ######
# pprint.pprint(json.loads(message))
def on_close(wsapp):
print("closed connection")
endpoint = 'wss://ws.lightstream.bitflyer.com/json-rpc'
ws = websocket.WebSocketApp(endpoint,
on_open=on_open,
on_message=on_message,
on_close=on_close)
ws.run_forever()
# Function to subscribe to order book updates.
def ws_orderBookUpdates():
def on_open(wsapp):
print("opened")
subscribe_message = {
"method": "subscribe",
"params": {'channel': "lightning_board_BTC_JPY"}
}
wsapp.send(json.dumps(subscribe_message))
def on_message(wsapp, message):
print(f"Order Book, Received : {datetime.now()}")
###### full json payloads ######
# pprint.pprint(json.loads(message))
def on_close(wsapp):
print("closed connection")
endpoint = 'wss://ws.lightstream.bitflyer.com/json-rpc'
ws = websocket.WebSocketApp(endpoint,
on_open=on_open,
on_message=on_message,
on_close=on_close)
ws.run_forever()
def handle_ctrl_c(signum, stack_frame):
sys.exit(0)
if __name__ == '__main__':
import multiprocessing as mp
signal.signal(signal.SIGINT, handle_ctrl_c) # terminate on ctrl-c
print('Enter Ctrl-C to terminate.')
p1 = mp.Process(target=ws_tickerInfo)
p2 = mp.Process(target=ws_orderBookUpdates)
p1.start()
p2.start()
p1.join() # wait for completion (will never happen)
p2.join() # wait for completion (will never happen)
使用多线程
if __name__ == '__main__':
import threading
t1 = threading.Thread(target=ws_tickerInfo, daemon=True)
t2 = threading.Thread(target=ws_orderBookUpdates, daemon=True)
t1.start()
t2.start()
input('Hit enter to terminate...n')