我正在尝试从TWS API下载历史数据,并使用keepuptodate=True使数据保持最新。到目前为止,我已经设法从TWS API下载历史数据,并将其存储在数据框架中。
from ibapi.client import EClient
from ibapi.wrapper import EWrapper
from ibapi.contract import Contract
import threading
import time
class TradingApp(EWrapper, EClient):
def __init__(self):
EClient.__init__(self,self)
def historicalData(self, reqId, bar):
print("HistoricalData. ReqId:", reqId, "BarData.", bar)
def websocket_con():
app.run()
app = TradingApp()
app.connect("127.0.0.1", 7497, clientId=1)
con_thread = threading.Thread(target=websocket_con, daemon=True)
con_thread.start()
time.sleep(1) # some latency added to ensure that the connection is established
def usTechStk(symbol,sec_type="STK",currency="USD",exchange="ISLAND"):
contract = Contract()
contract.symbol = symbol
contract.secType = sec_type
contract.currency = currency
contract.exchange = exchange
return contract
def histData(req_num,contract,duration,candle_size):
app.reqHistoricalData(reqId=req_num,
contract=contract,
endDateTime='',
durationStr=duration,
barSizeSetting=candle_size,
whatToShow='TRADES',
useRTH=0,
formatDate=1,
keepUpToDate=True,
chartOptions=[])
tickers = ["META","AMZN","INTC"]
for ticker in tickers:
print(ticker)
histData(tickers.index(ticker),usTechStk(ticker),'2 D', '1 min')
time.sleep(10)
我还设法使历史数据保持最新。
from ibapi.client import EClient
from ibapi.wrapper import EWrapper
from ibapi.contract import Contract
import threading
import time
class TradingApp(EWrapper, EClient):
def __init__(self):
EClient.__init__(self,self)
def historicalData(self, reqId, bar):
print("HistoricalData. ReqId:", reqId, "BarData.", bar)
def historicalDataUpdate(self, reqId, bar):
print("HistoricalDataUpdate. ReqId:", reqId, "BarData.", bar)
def websocket_con():
app.run()
app = TradingApp()
app.connect("127.0.0.1", 7497, clientId=1)
con_thread = threading.Thread(target=websocket_con, daemon=True)
con_thread.start()
time.sleep(1)
contract = Contract()
contract.symbol = "META"
contract.secType = "STK"
contract.currency = "USD"
contract.exchange = "ISLAND"
app.reqHistoricalData(reqId=1,
contract=contract,
endDateTime='',
durationStr='2 D',
barSizeSetting='1 min',
whatToShow='TRADES',
useRTH=0,
formatDate=1,
keepUpToDate=True,
chartOptions=[])
time.sleep(5)
app.historicalDataUpdate(reqId=1,bar='BarData') # EClient function to request contract details
如何获取第一个代码块中的数据帧以使用流数据进行更新?
我得到了排序,并与其他被卡住的人共享。
# Import libraries
from ibapi.client import EClient
from ibapi.wrapper import EWrapper
from ibapi.contract import Contract
import pandas as pd
import threading
import time
class TradeApp(EWrapper, EClient):
def __init__(self):
EClient.__init__(self, self)
self.data = {}
def historicalData(self, reqId, bar):
if reqId not in self.data:
self.data[reqId] = [{"Date":bar.date,"Open":bar.open,"High":bar.high,"Low":bar.low,"Close":bar.close,"Volume":bar.volume}]
else:
self.data[reqId].append({"Date":bar.date,"Open":bar.open,"High":bar.high,"Low":bar.low,"Close":bar.close,"Volume":bar.volume})
print("reqID:{}, date:{}, open:{}, high:{}, low:{}, close:{}, volume:{}".format(reqId,bar.date,bar.open,bar.high,bar.low,bar.close,bar.volume))
def historicalDataUpdate(self, reqId, bar):
if reqId in self.data:
print('reqid in data')
self.data[reqId].append({"Date":bar.date,"Open":bar.open,"High":bar.high,"Low":bar.low,"Close":bar.close,"Volume":bar.volume})
#print("HistoricalDataUpdate. ReqId:", reqId, "BarData.", bar)
def usTechStk(symbol,sec_type="CASH",currency="JPY",exchange="IDEALPRO"):
contract = Contract()
contract.symbol = symbol
contract.secType = sec_type
contract.currency = currency
contract.exchange = exchange
return contract
def histData(req_num,contract,duration,candle_size):
"""extracts historical data"""
app.reqHistoricalData(reqId=req_num,
contract=contract,
endDateTime='',
durationStr=duration,
barSizeSetting=candle_size,
whatToShow='MIDPOINT',
useRTH=0,
formatDate=1,
keepUpToDate=True,
chartOptions=[]) # EClient function to request contract details
def websocket_con():
app.run()
app = TradeApp()
app.connect(host='127.0.0.1', port=7497, clientId=23) #port 4002 for ib gateway paper trading/7497 for TWS paper trading
con_thread = threading.Thread(target=websocket_con, daemon=True)
con_thread.start()
time.sleep(1) # some latency added to ensure that the connection is established
tickers = ['USD','EUR']
for ticker in tickers:
histData(tickers.index(ticker),usTechStk(ticker),'2 D', '1 min')
time.sleep(5)
###################storing trade app object in dataframe#######################
def dataDataframe(symbols,TradeApp_obj):
"returns extracted historical data in dataframe format"
df_data = {}
for symbol in symbols:
df_data[symbol] = pd.DataFrame(TradeApp_obj.data[symbols.index(symbol)])
df_data[symbol].set_index("Date",inplace=True)
return df_data
while True:
#extract and store historical data in dataframe
historicalData = dataDataframe(tickers,app)
print(historicalData)
time.sleep(60)