请任何人建议最佳实践/方法:
组合2个线程:
(线程1=Dataprovider1_live_api,螺纹2=Dataprovider2_live_api(
使用异步websocket客户端,以便我可以发布Thread1&威胁2以最佳方式?
我相信目前的方法肯定是不正确的;我把它弄糊涂了/只让它工作了几十秒&容易表现不佳&崩溃给定了我如何合并线程&异步websocket函数
欢迎任何建议、想法或提示;非常感谢:(
我还想添加一个
await webscoket.recv()
但我无法让它工作,因为它已经很乱了。如果有人知道我如何同时接收websocket消息,这将非常非常有帮助,甚至是最佳实践。
我也愿意将下面的websocket函数插入到我的dataprovider1_api_loop&dataprovider2_api_loop线程,如果这是正确的做法?
线程1&Thread2使用在Thread2&然后通过websocket客户端发布。
在func df_random((下面是一个很好的代理,可以代理线程1&螺纹2.
def df_random():
summary_df_large = pd.DataFrame()
ccy_bbgn = ['KWN','IRN','NTN','IHN','PPN','CCN']
while True:
time.sleep(0.9)
int_ = random.randint(1,9)
int_2 = random.randint(1,9)
# for ccy in ccy_bbgn:
ccy = ccy_bbgn[0]
df = pd.DataFrame([{"CCY":ccy, "TENOR":"Tom Fix","BID": int_, "ASK": int_2,"MID":7, "SPREAD": 0.8, "SKEW": 0,"BID_override":"","ASK_override": "", "BID_output": 5,"ASK_output": 6, "MID_output": 8, "POINTS_SCALE":0,"BB MID": 8, "CHK": 8, "DAYS":0},
{"CCY":ccy,"TENOR":"1W1M","BID": 1, "ASK": 5,"MID":7, "SPREAD": 0.07, "SKEW": 0,"BID_override":"","ASK_override": "", "BID_output": 5,"ASK_output": 6, "MID_output": 8, "POINTS_SCALE":0,"BB MID": 8, "CHK": 8, "DAYS":0},
{"CCY":ccy,"TENOR":"1x2","BID": 1, "ASK": 5,"MID":7, "SPREAD": 0.07, "SKEW": 0,"BID_override":"","ASK_override": "", "BID_output": 5,"ASK_output": 6, "MID_output": 8, "POINTS_SCALE":0,"BB MID": 8, "CHK": 8, "DAYS":0},
{"CCY":ccy,"TENOR":"1x3","BID": 1, "ASK": 5,"MID":7, "SPREAD": 0.07, "SKEW": 0,"BID_override":"","ASK_override": "", "BID_output": 5,"ASK_output": 6, "MID_output": 8, "POINTS_SCALE":0,"BB MID": 8, "CHK": 8, "DAYS":0},
{"CCY":ccy,"TENOR":"1x6","BID": 1, "ASK": 5,"MID":7, "SPREAD": 0.07, "SKEW": 0,"BID_override":"","ASK_override": "", "BID_output": 5,"ASK_output": 6, "MID_output": 8, "POINTS_SCALE":0,"BB MID": 8, "CHK": 8, "DAYS":0},
{"CCY":ccy,"TENOR":"1x9","BID": 1, "ASK": 5,"MID":7, "SPREAD": 0.1, "SKEW": 0,"BID_override":"","ASK_override": "", "BID_output": 5,"ASK_output": 6, "MID_output": 8, "POINTS_SCALE":0,"BB MID": 8, "CHK": 8, "DAYS":0},
{"CCY":ccy,"TENOR":"1x12","BID": 1, "ASK": 5,"MID":7, "SPREAD": 0.1, "SKEW": 0,"BID_override":"","ASK_override": "", "BID_output": 5,"ASK_output": 6, "MID_output": 8, "POINTS_SCALE":0,"BB MID": 8, "CHK": 8, "DAYS":0}])
print(df.head())
print('------------')
yield df
这是创建两个线程的数据类,这两个线程通过队列对象对共享数据帧执行操作(每个线程通过api接收单独的和不同的新数据(
import asyncio
import websockets
import threading
import datetime
import json
import threading
import time
import random
import pandas as pd
import datetime
import queue
import sys, os
import numpy as np
class data:
def start_publish(self):
self.df_1 = pd.DataFrame(columns=['Instrument','BID','ASK'])
q = queue.Queue() # used for the dataprovider1
q_summary = queue.Queue() # used for the dataprovider2
lock = threading.Lock() # Create lock obj
self.t_1 = threading.Thread(target=self.dataprovider1_api_loop, args=(q,lock),name=f"dataprovider1",daemon = True)
self.t_2 = threading.Thread(target=self.dataprovider2_api_loop, args=(q,q_summary, lock),name=f"dataprovider2",daemon = True)
#Start threads :
self.t_1.start() #dataprovider1
self.t_2.start() #dataprovider2
该数据类被初始化&然后通过:调用
#Lines 450 & 451
data_obj = data()
data_obj.start_publish()'
然后,我们定义一个异步websocket函数,并运行事件
#Line 453 onwards
async def run_websocket():
web_socket_host = 'ws://ipaddres:port'
async with websockets.connect(web_socket_host) as websocket:
while True:
try:
#sumary websocket publish#
json_summary_arr = json.dumps({"SUM":data_obj.updated_summary_df.to_dict('records'),"DATETIME":datetime.datetime.now()},default=str)
json_arr = json.dumps({"data_prov1":data_obj.df_bbg[["TENOR","BID","ASK",'TIME']].to_dict('records'),"DATETIME":datetime.datetime.now()},default=str)
await websocket.send(json_arr)
#sumary websocket publish#
await websocket.send(json_summary_arr)
except Exception as e:
print(e)
except websockets.ConnectionClosed as exc:
print('Connection closed while sending')
# self.log.debug('Connection closed while sending')
disconnected = Disconnected(exc.code)
# self.jobs.close(Result(disconnected))
raise disconnected from exc
asyncio.get_event_loop().run_until_complete(run_websocket())
print('Done')
#End
不幸的是,我们收到了以下错误,我认为这是由于超时问题,我不确定如何修复,请提供任何建议、想法或提示;非常感谢:(
我的websocket服务器在一台独立的机器上;运行在C#中,我很确定不是websocket服务器没有引起问题。
sent 1011 (unexpected error) keepalive ping timeout; no close frame received
第一:有3个主要异常与关闭的连接场合有关使用websockets python包时
1. websockets.exceptions.ConnectionClosedError
2. websockets.exceptions.ConnectionClosedOK
3. websockets.exceptions.ConnectionClosed
2'nd:在处理梯子时,你必须考虑异常的顺序大多数独特的异常(如上面的3(可以在通用"异常"上被捕获;例外情况";将异常处理从最特定的重新排序为最通用的(s.o.上有很多引用,我不是第一个提到它的(。第一个也是最具体的是ConnectionClosedOK,第二个ConnectionClosedError,第三个ConnectionClosed,然后才处理类似"的泛型;例外情况";或运行时。第一类的三个异常也是泛型";例外情况";因此它们将在之前(在您的代码上(进行处理
3'rd:在开发过程中(与部署不同(单独的IO/网络相关操作以单独的异常处理,这样您就可以验证是什么导致了异常,或者使用全局标志来识别原因:
while True:
flag_cause="ok"
try:
#sumary websocket publish#
flag_cause="dumps_num01"
json_summary_arr = json.dumps({"SUM":data_obj.updated_summary_df.to_dict('records'),"DATETIME":datetime.datetime.now()},default=str)
flag_cause="dumps_num02"
json_arr = json.dumps({"data_prov1":data_obj.df_bbg[["TENOR","BID","ASK",'TIME']].to_dict('records'),"DATETIME":datetime.datetime.now()},default=str)
flag_cause="dumps_send01"
await websocket.send(json_arr)
#sumary websocket publish#
flag_cause="dumps_send02"
await websocket.send(json_summary_arr)
诀窍是,这个标志是全局的,所以在异常上下文中有效,您可以在异常中打印它来验证原因。
第4个:websockets包使用任务异步循环上下文而不是线程websocket包使用线程。因此,要么只将工作转移到websocket包,要么使用任务和微服务而不是线程。
最好。ROri