我正在尝试新的Python Interactive Broker API,但是我在第一步遇到了一些严重的速度问题...
以下代码(见下文(times
0:00:08.832813
,直到收到数据
0:00:36.000785
直到应用完全断开...
为什么这么慢?加快速度的最佳方法是什么?
from ibapi import wrapper
from ibapi.client import EClient
from ibapi.utils import iswrapper #just for decorator
from ibapi.common import *
from ibapi.contract import *
import datetime
from datetime import timedelta
class DataApp(wrapper.EWrapper, EClient):
def __init__(self):
wrapper.EWrapper.__init__(self)
EClient.__init__(self, wrapper=self)
@iswrapper
def historicalData(self, reqId: TickerId, date: str, open: float, high: float,
low: float, close: float, volume: int, barCount: int,
WAP: float, hasGaps: int):
super().historicalData(reqId, date, open, high, low, close, volume,
barCount, WAP, hasGaps)
print("HistoricalData. ", reqId, " Date:", date, "Open:", open,
"High:", high, "Low:", low, "Close:", close, "Volume:", volume)
@iswrapper
def historicalDataEnd(self, reqId: int, start: str, end: str):
super().historicalDataEnd(reqId, start, end)
print("HistoricalDataEnd ", reqId, "from", start, "to", end)
print(datetime.datetime.now()-startime)
self.done = True # This ends the messages loop - this was not in the example code...
def get_data(self):
self.connect("127.0.0.1", 4002, clientId=10)
print("serverVersion:%s connectionTime:%s" % (self.serverVersion(),
self.twsConnectionTime()))
cont = Contract()
cont.symbol = "ES"
cont.secType = "FUT"
cont.currency = "USD"
cont.exchange = "GLOBEX"
cont.lastTradeDateOrContractMonth = "201706"
self.reqHistoricalData(1, cont, datetime.datetime.now().strftime("%Y%m%d %H:%M:%S"),
"1800 S", "30 mins", "TRADES", 0, 1, [])
self.run()
self.disconnect()
print(datetime.datetime.now()-startime)
global starttime
startime = datetime.datetime.now()
DA = DataApp()
DA.get_data()
我还尝试不断运行它是背景,以便仅用
即时提交请求def runMe():
app.run() # where run() has be removed from the class definition
import threading
thread = threading.Thread(target = runMe)
thread.start()
但也很慢。任何建议都赞赏
我建议您修改IBAPI模块中连接类中的连接套接字锁定。该建议来自Github上的Heshiming;如果您可以访问私人互动经纪人存储库,则可以在此处访问讨论
我做到了,它显着提高了性能。
heshiming建议您减少套接字锁定对象上的超时,每次发送或接收消息时都会调用。要修改套接字锁,请转到IBAPI的站点包装文件夹,然后在连接中修改连接函数。这是我拥有的版本的连接48行。
如果您看不到Heshiming的帖子,我将其包含在本文的底部。
替代选项:另一个有趣的解决方案是利用异步的异步事件循环。我没有这样做,但看起来很有希望。请参阅示例Ewald组合https://github.com/erdewit/tws_async
heshiming评论:
连接/ibapi/connection.py的实现 在SendMSG和RECVMSG中都有一个锁定对象。自连接以来, self.socket.settimeout(1(被调用,因此基础 self.socket.recv(4096(每秒一次仅一次。
这种实现会产生绩效问题。因为锁是 共享,插座在接收时无法发送数据。在情况下 收到的消息小于4K字节长,recvMSG 功能将等待1秒钟,然后释放锁,制作 随后的sendmsg等待。在我的实验中,大多数消息似乎是 比4K字节短。换句话说,这强加了一个帽子 RECVMSG每秒。
有一些减轻这种情况的策略。一个人可以减少 接收缓冲区至小于4K的数字,或者减少插座 超时到0.001秒以使其更少。
或根据 http://stackoverflow.com/questions/1981372/are-parallel-calls-to-send-send-recv-on-the-same-same-samet-valid ,插座本身实际上是线程安全。因此没有锁 必要。
我尝试了所有三种策略。卸下锁最好。和 将超时减少到0.001的工作方式相似。
我只能保证Linux/Unix平台,我没有尝试过 视窗。您是否考虑更改实施以改进 这个?
当前API(2019年5月(通过删除recvMsg
中的锁定,如@Benm在答案中提出的那样。但是,取决于请求的类型,它仍然可能很慢。
删除大部分日志记录有所帮助,大概是因为某些MSG类型很大。我最初尝试使用logging
库对其进行过滤,但是Flat-Out删除代码的速度更快,然后将其放在生成较大字符串之前甚至将其传递给logging
之前所需的额外处理。
用Deque 替换队列:也可能值得用COC_7中的Queue
用Collections.deque替换CC_6,因为deque
速度> 10倍。我已经在其他地方进行了大速速度,但还没有为此而努力。deque
应该是线程安全的,而无需以此处使用的方式使用:" Deques支持线程安全,内存有效的附加和从Deque的任一侧的弹出"(来自DOCS(。
队列与Deque速度比较
app.run()
是一个无限的回路,而app.done == False
是一个无限的回路,但是当将app.done
设置为true时,它不会立即停止。(我不知道为什么(。
我所做的是编写一种新方法,而不是使用app.run()
。
这是我的解决方案:
import time
from ibapi import (decoder, reader, comm)
并将此功能放入您的客户端类。
def getMessage(self, wait=3):
# wait 3 secs for response to come in
time.sleep(wait)
# get everything in app.msg_queue
while not self.msg_queue.empty():
text = self.msg_queue.get(block=True, timeout=0.2)
fields = comm.read_fields(text)
self.decoder.interpret(fields)
用法很简单。只需使用app.getMessage()
而不是app.run()