我正在开发一个金融应用程序,它(间接地(使用websocket-client每秒接收来自NseIndia的实时提要。
程序结构由并行运行的多个任务组成。
所有这些任务都需要同时提供各种交易品种的实时馈送。
每个符号都是一个 Python 对象,其中包含请求该符号的任务列表。
源源模块每秒接收数百个源。它有一个方法"distribute((",用于隔离传入的提要并将其发送到相应的交易品种。然后,符号进一步将源重新分发到所有需要它的任务。
为了实现这种并行性,我一直在使用 python 线程模块的线程。
问题是 CPU 一次不能处理超过 1000 个线程,抛出
"RuntimeError: can't start new thread"
完整回溯:
Exception in thread Thread-6542:
Traceback (most recent call last):
File "C:UsersPC2AppDataLocalProgramsPythonPython37-32libthreading.py", line 917, in _bootstrap_inner
self.run()
File "C:UsersPC2AppDataLocalProgramsPythonPython37-32libthreading.py", line 865, in run
self._target(*self._args, **self._kwargs)
File "C:UsersPC2PycharmProjectsFeedtest2ArenaCerebelle.py", line 49, in quote_distribute
inx.feed(quote)
File "C:UsersPC2PycharmProjectsFeedtest2ArenaInstrument.py", line 23, in feed
Thread(target=sprite.feed, args=(quote,), daemon=True).start()
File "C:UsersPC2AppDataLocalProgramsPythonPython37-32libthreading.py", line 847, in start
_start_new_thread(self._bootstrap, ())
RuntimeError: can't start new thread
请提出解决方法或替代方法!
谢谢!
工作流的图形表示
与其有那么多线程,不如使用消息系统(如RabbitMQ,Apache Kafka(。将从 websocket 获得的消息推送到相应的队列,然后一个或多个单独的工作进程可以按照接收顺序处理它们。