Python:调度在线处理大容量实时数据流



我在不同的websockets上有一些大容量数据流(传感器数据-每月几tb),其中

我想保证所有的数据都被存储

所以我想要将数据发送到我的数据库和实时处理模块(例如GUI, ML预测等),以

it缓冲数据流,以防所述模块的处理速度太慢,以便在负载减少时可以'赶上'.

What I tried:python线程与队列(从队列模块或线程模块),但如果它是阻塞的,我不能确保数据不拥挤,如果它的非阻塞(例如asyncio.Queue),我得到竞争条件和事情爆发。

也许我应该使用一些回调方法但我不知道要找什么。我希望这个问题不要太模糊。如果有人有指针,我可以试试最好只使用python,这真的会帮我很多,即使这只是一个想法。

所以也许我应该使用某种回调方法但我不知道要寻找什么

看起来你需要一个Future

  • concurrent.futures.Future()
  • asyncio.Future()

我尝试过的:python线程与队列(从队列模块或线程模块),但如果它是阻塞的,我不能确保数据不拥挤,如果它的非阻塞(例如asyncio.Queue)我得到竞争条件和事情爆炸。

您可以尝试使用Queue的非阻塞方法:

  • Queue.put_nowait()
  • Queue.get_nowait()

queue.Emptyqueue.Full异常被捕获时查询数据库

最新更新