Cryptofeed多处理未推送到队列



我正试图使用cryptofeed来获取多个进程上的多个令牌的提要。还可以检索这些提要,并使用多个工作进程将它们推送到数据库中。下面尝试的代码适用于馈送进程,但不适用于工作进程中的queue.get()


queue = multiprocessing.Queue()
async def ticker_function(data, time):
# Send data to queue
queue.put(data, block=False)
def new_feed(coin):
f = FeedHandler()
f.add_feed(cryptofeed.exchanges.Binance(
channels=[TICKER], 
symbols=[f"{coin}-USDT"],
callbacks={TICKER: ticker_function}
))
f.run()
def worker(queue):
print("Starting worker")
while True:
if not queue.empty():
data = queue.get(block=False) # THIS IS WHERE IT SAYS QUEUE EMPTY
json_payload = {
"measurement": data.symbol,
"time": int(time.time()*1000000000),
"fields": {
'bid': float(data.bid),
'ask': float(data.ask),
}
}
# Push to influxdb
write_api.write(bucket='test_bucket', org='mydb', record=json_payload)
else:
time.sleep(0.01)
if __name__ == '__main__':


data_process = multiprocessing.Process(target=worker, args=(queue,))
data_process.start()
feed_process = multiprocessing.Process(target=new_feed, args=('BTC',))
feed_process.start()

# To stop the main process from ending, adding a join (won't ever trigger)
feed_process.join()

抛出异常(我排除并通过的(:

Exception has occurred: Empty
exception: no description
File "/Users/user/sandbox/live-pnl/main.py", line 90, in worker
data = queue.get(block=False)
File "<string>", line 1, in <module>

我做错了什么?

编辑:更改代码后跟踪另一个错误

Traceback (most recent call last):
File "<string>", line 1, in <module>
File "/opt/homebrew/Cellar/python@3.10/3.10.6_2/Frameworks/Python.framework/Versions/3.10/lib/python3.10/multiprocessing/spawn.py", line 116, in spawn_main
exitcode = _main(fd, parent_sentinel)
File "/opt/homebrew/Cellar/python@3.10/3.10.6_2/Frameworks/Python.framework/Versions/3.10/lib/python3.10/multiprocessing/spawn.py", line 126, in _main
self = reduction.pickle.load(from_parent)
File "/opt/homebrew/Cellar/python@3.10/3.10.6_2/Frameworks/Python.framework/Versions/3.10/lib/python3.10/multiprocessing/synchronize.py", line 110, in __setstate__
self._semlock = _multiprocessing.SemLock._rebuild(*state)
FileNotFoundError: [Errno 2] No such file or directory

在文档之后,如果队列为空,并且您使用queue.get(block=True)请求项目,则会引发异常queue.Empty

相反,您可能希望block=False等待一个值,直到它在队列中可用。

或者,首先检查队列是否为空,然后请求一个项目:

import time
def worker(queue):
print("Starting worker")
while True:
if not queue.empty()
data = queue.get()
json_payload = {
"measurement": data.symbol,
"time": int(time.time()*1000000000),
"fields": {
'bid': float(data.bid),
'ask': float(data.ask),
}
}
# Push to influxdb
write_api.write(bucket='test_bucket', org='mydb', record=json_payload)
else:
time.sleep(0.01) # sanity delay 

相关内容

  • 没有找到相关文章

最新更新