我正在研究实时数据抓取器。我有一段时间的 True 循环,在其中,我生成了执行相对较小的任务的线程(我正在通过 HTTP 查询第三方 API,并且为了实现快速速度,我正在并行查询(。
每个线程负责更新特定的数据系列。这可能需要 2、3 甚至 5 秒。但是,我的 while True 循环生成线程的速度可能比线程完成所需的时间更快。因此,我需要生成的线程等待其先前的线程完成。
通常,线程完成需要多长时间是不可预测的,因为线程查询 HTTP 服务器...
我正在考虑为每个线程创建一个命名信号量,然后如果为特定系列生成的线程找到在同一系列上工作的先前线程,它将等待。
我能看到的唯一问题是线程可能积压。
这里最好的解决方案是什么?我应该研究芹菜之类的东西吗?我目前正在使用线程模块。
谢谢!
不! 拜託,為了你的上帝或聰明設計師的愛,不要這樣做! 不要不断地创建/生成/任何线程并尝试对它们进行微观管理。 线程池 - 在启动时创建一些线程,并传递给它们生产者-消费者队列,以等待表示这些 HTTP 任务的类实例。
你应该使用 Queue.Queue
. 为每个系列创建一个队列,并创建一个线程来侦听该队列。 每次需要读取序列时,请在队列中放置一个请求。 线程等待队列中的项目,它接收的每个项目都会读取数据。
如果您只是在每次查询返回时重新查询 API,您可以使用的另一个选项是像 Twisted(他们的线程教程(这样的异步框架。 我是一个相对扭曲的初学者,所以可能有比这个更好的方法来扭曲扭曲你的任务 -
from twisted.internet import reactor, defer
def simple_task():
status = query_your_api()
return status
def repeating_call(status):
print(status)
d = threads.deferToThread(simple_task)
d.addCallback(repeating_call)
data_series = [data1, data2, data3]
for data in data_series:
repeating_call('starting everything up')
reactor.run()