使用多处理.在路易吉排队



我在luigi中有一组任务,这些任务都需要访问数据库。我最多可以同时有 8 个任务访问我的数据库,前提是它们位于不同的端口上(我有允许的端口列表(。 我应该如何最好地实现这个限制,它似乎类似于工作线程数量的标准限制,即就我而言,当工作线程空闲且数据库端口空闲时,任务应该运行。

我尝试在__main__中创建一个multiprocessing.Queue()并将其传递给WrapperTask, 将其作为luigi.Parameter()接收,但这给出了错误并挂起

UserWarning: Parameter "queue" with value <multiprocessing.queues.Queue object at 0x00000000149E4518>" is not of type string.
warnings.warn('Parameter "{}" with value "{}" is not of type string.'.format(param_name, param_value))

这个想法是,如果队列为空,.get()调用将挂起一个任务,并在另一个任务再次.put(port)后继续。

这里出了什么问题?还是我采取了完全错误的方法来管理 luigi 中的资源?

您应该在 Luigi 配置中使用"资源"部分。这将确保不超过这个数量的工作人员共享全球资源。https://luigi.readthedocs.io/en/stable/configuration.html#resources 在此处查找更多信息。

最新更新