在单独的线程中运行Websocket,更新类属性



我想实现一个类,它可以在不同的线程中启动各种websocket来检索市场数据并更新类属性。我正在使用kucoin-python-sdk库来达到这个目的。以下内容在spyder中运行良好,但当我将脚本设置为通过conda批处理运行时,它会失败,并多次出现以下错误。非常感谢。

<任务完成名称="任务-4"coro=<ConnectWebsocket_run((已完成,>定义在>路径\lib\site-packages\kucoin\websocket\websocket.py:33>gt;exception=RuntimeError("关闭后无法注册atexit"(>得到an>异常无法在关闭后注册atexit挂起name="任务-3"coro=<ConnectWebsocket_recover_topic_req_msg((运行>在>路径\lib\site-packages\kucoin\websocket\websocket.py:127>gt;wait_for=>取消确定。&gt_重新连接结束

<任务完成名称="任务-7"coro=<ConnectWebsocket_run((完成,定义在>gt;路径\lib\site-packages\kucoin\websocket\websocket。py:33>gt;exception=RuntimeError("关闭后无法注册atexit"(>得到一个>异常无法在关闭后注册atexit挂起name="任务-6"coro=<ConnectWebsocket_recover_topic_req_msg((正在运行>在路径\lib\site-packages\kucoin\websocket\websocket.py:127>gt;wait_for=>取消确定。&gt_重新连接。

因此感到疑惑:

  1. 问题是来自Kucoin包,还是我的线程/asyncio实现不正确
  2. 如何解释Spyder执行和conda在同一环境中的不同行为

Python 3.9.13 | Spyder 5.3.3 | Spyde内核2.3.3 | websocket 0.2.1 | nest asyncio 1.5.6 | kucoin Python 1.0.11

Class_X.py

import asyncio
import nest_asyncio
nest_asyncio.apply()
from kucoin.client import WsToken
from kucoin.ws_client import KucoinWsClient
from threading import Thread
class class_X():
def __init__(self):
self.msg= ""

async def main(self):
async def book_msg(msg):
self.msg = msg
client = WsToken()
ws_client = await KucoinWsClient.create(None, client, book_msg, private=False)
await ws_client.subscribe(f'/market/level2:BTC-USDT')
while True:
await asyncio.sleep(20)

def launch(self):
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.run_until_complete(self.main())

instance = class_X()
t = Thread(target=instance.launch)
t.start()    

批次

call pathanaconda3Scriptsactivate myENV
python "path1class_X.py"
conda deactivate

我想说这是你的实现,但我还没有尝试过用你现在的方式使用该客户端。下面是我正在做的在异步中实现kucoin python的精简框架。

import asyncio
from kucoin.client import WsToken
from kucoin.ws_client import KucoinWsClient
from kucoin.client import Market
from kucoin.client import User
from kucoin.client import Trade
async def main():

async def handle_event(msg):
if '/market/snapshot:' in msg['topic']:
snapshot = msg['data']['data']
## trade logic here using snapshot data

elif msg['topic'] == '/spotMarket/tradeOrders':
print(msg['data'])
else:
print("Unhandled message type")
print(msg)
async def unsubscribeFromPublicSnapsot(symbol):
ksm.unsubscribe('/market/snapshot:' + symbol)

async def subscribeToPublicSnapshot(symbol):
try:
print("subscribing to " + symbol)
await ksm.subscribe('/market/snapshot:' + symbol)
except Exception as e:
print("Error subscribing to snapshot for " + doc['currency'])
print(e)
pubClient = WsToken()
print("creating websocket client")
ksm = await KucoinWsClient.create(None, pubClient, handle_event, private=False)
# for private topics pass private=True
privateClient = WsToken(config["tradeKey"], config["tradeSecret"], config["tradePass"])
ksm_private = await KucoinWsClient.create(None, privateClient, handle_event, private=True)
# Always subscribe to BTC-USDT
await subscribeToPublicSnapshot('BTC-USDT')
# Subscribe to the currency-BTC spot market for each available currency
for doc in tradeable_holdings:
if doc['currency'] != 'BTC':  # Don't need to resubscribe :D
await subscribeToPublicSnapshot(doc['currency'] + "-BTC")
# Subscribe to spot market trade orders
await ksm_private.subscribe('/spotMarket/tradeOrders')

if __name__ == "__main__":
print("Step 1: Kubot initialzied")
print("Step 2: ???")
print("Step 2: Profit")
loopMain = asyncio.get_event_loop()
loopMain.create_task(main())
loopMain.run_forever()
loopMain.close()

正如你可能猜到的;tradable_ holdings";是我感兴趣的符号列表,我已经拥有了这些符号。您还会注意到,我使用的是快照,而不是市场/股票行情订阅。我认为,在100毫秒的更新速度下,它可能会很快陷入延迟和比赛条件——至少在我弄清楚如何处理这些问题之前。所以我选择了每2秒更新一次的快照,而选择了不太活跃的硬币,甚至不经常更新。

不管怎样,我还没有达到交易的目的,但我很快就明白了。

希望这能帮助你弄清楚你的实现,尽管它有所不同。

相关内容

  • 没有找到相关文章