下面的代码模拟生产者和消费者模型,该模型将从外汇经纪商FXCM收集数据并写入数据库。 每个生产者进程都将与代理建立基于会话的连接。
生产者和消费者都将无限期运行,直到"毒丸"被放入队列中,这发生在营业结束时(星期五 22:00)。我省略了这部分代码,因为它与问题无关。我能找到的所有示例似乎都会生成一个流程,在短时间内做一些工作,然后join()
回到父流程。像这里这个
如上所述,生产者将无限期运行,其原因是登录并与代理创建会话大约需要 3 秒。
运行下面的代码时,您将看到队列积压工作,尽管在运行实际代码时这似乎更糟。
不确定它是否相关,但会话是使用 python-forexconnect API 创建的,该 API 是用 C++ 编写并使用 Boost 编写的。
问题是消费者从队列中get()
项目花费的时间太长,我想知道这个模型是否是处理此类开发的正确方法。
谢谢
示例代码
from multiprocessing import Process, Queue, cpu_count
from datetime import datetime, timedelta
import numpy as np
import time
def dummy_data(dtto):
dates = np.array(
[dtto - timedelta(days=i) for i in range(300)])
price_data = np.random.rand(len(dates),5)
return np.concatenate(
(np.vstack(dates),price_data), axis=1)
def get_bars(q2, ms, symbol, dtfm, dtto, time_frame):
stop_date = dtfm
while dtto > stop_date:
data = dummy_data(dtto)
dtfm = data[-1,0]
dtto = data[0,0]
q2.put((symbol, dtfm, dtto))
# Switch to date
dtto = dtfm
def producer(q1,q2):
# client = fx.Client(....)
client = 'broker session'
while True:
job = q1.get()
if job == None:
break
sym, dtfm, dtto, tf = job
# Get price data from broker
get_bars(q2, client, sym, dtfm, dtto, tf)
q2.put(None)
def consumer(q2):
while True:
bars = q2.get()
if bars == None:
break
print(q2.qsize(), bars[0], bars[1], bars[2]) # write to db
q1, q2 = Queue(), Queue()
# instruments = client.get_offers()
# instruments = ['GBP/USD', 'EUR,USD',...]
instruments = range(63) # 62 dummy instruments
# Places jobs into the queue for each symbol
for symbol in instruments:
q1.put((symbol,
datetime(2000,1,14,22,0),
datetime(2018,1,14,22,0),
'D1'))
# Setup producers and consumers
pp, cp = range(6), range(2)
pro = [Process(target=producer, args=(q1, q2,)) for i in pp]
con = [Process(target=consumer, args=(q2,)) for i in cp]
for p in pro: p.start()
for p in con: p.start()
# This is just here to stop this script and does not
# exist in the real version
for i in pp: q1.put(None)
for p in pro: p.join()
for p in con: p.join()
print('stopped')
multiprocessing.Queue.get()
的可怕性能是一个已知问题(关于Stackoverflow的几个问题,但没有通常有用的答案)。
哪种表示您应该考虑另一种模型。您可以看到与此相比有多少流程创建开销; 根本不要使用永久运行的进程,而是在准备好数据后立即启动进程。执行此操作时,子进程将在进程分叉时收到内存中的数据副本。这会增加进程创建开销,但会删除队列。 您至少可以考虑这一点,因为您的使用者写入数据库并且不需要向父级报告任何内容。
Python是一种很棒的语言,但在并行处理方面并不是性能最好的。