python多处理调度任务



我有8个CPU核心和200个任务要做。每个任务都是孤立的。没有必要等待或分享结果。我正在寻找一种一次运行8个任务/进程的方法(最大值),并在其中一个完成时运行。剩下的任务将自动启动进程。

如何知道子进程何时完成并启动新的子进程。首先,我尝试使用进程(多处理),但很难弄清楚。然后我尝试使用池,并面临pickle问题,因为我需要使用动态实例化。

编辑:添加我的池代码

class Collectorparallel():
def fire(self,obj):
    collectorController = Collectorcontroller()
    collectorController.crawlTask(obj)
def start(self):
    log_to_stderr(logging.DEBUG)
    pluginObjectList = []
    for pluginName in self.settingModel.getAllCollectorName():
        name = pluginName.capitalize()
        #Get plugin class and instanitiate object
        module = __import__('plugins.'+pluginName,fromlist=[name])
        pluginClass = getattr(module,name)
        pluginObject = pluginClass()
        pluginObjectList.append(pluginObject)

    pool = Pool(8)
    jobs = pool.map(self.fire,pluginObjectList)
    pool.close()
    print pluginObjectList

pluginObjectList得到了类似的东西

[<plugins.name1.Name1 instance at 0x1f54290>, <plugins.name2.Name2 instance at 0x1f54f38>]

PicklingError:无法pickle:属性查找内置。instancemethod失败

但是Process版本运行良好

警告这对部署和情况来说有点主观,但我目前的设置如下

我有一个工人程序,我启动了6个副本(我有6个核心)。每个工人都要做以下工作;

  1. 连接到Redis实例
  2. 尝试弹出特定列表中的一些工作
  3. 推送日志记录信息
  4. 空闲或因"队列"中缺少工作而终止

然后,每个程序基本上都是独立的,同时仍然使用单独的排队系统来完成所需的工作。由于您在流程上没有中间人,因此这可能是您问题的解决方案。

我不是Python中多处理的专家,但我在这个帮助下尝试了一些新的东西http://www.tutorialspoint.com/python/python_multithreading.htm还有这个http://www.devshed.com/c/a/Python/Basic-Threading-in-Python/1/。

例如,您可以使用此方法isAlive来回答您的问题。

问题的解决方案很简单。首先,请注意方法不能腌制。事实上,只有pickle的文档中列出的类型可以进行pickle:

  • NoneTrueFalse
  • 整数、长整数、浮点数、复数
  • 普通字符串和Unicode字符串
  • 仅包含可拾取对象的tuplelistsetdict系列
  • 在模块顶层定义的函数
  • 模块顶层定义的内置函数
  • 在模块顶层定义的类
  • __dict__或调用__getstate__()的结果可拾取的此类实例(请参阅pickle协议一节详细信息)

[…]

请注意,函数(内置和用户定义)由"完全限定"名称引用,不按值。这意味着只有函数名称以及函数在中定义的模块名称被pickle。既不是函数的代码,也不是其功能属性被腌制。因此定义模块必须可在取消拾取环境中导入,并且模块必须包含命名对象,否则将引发异常。[4]

类似地,类通过命名引用进行pickle,因此取消拾取环境中的限制适用。请注意,类的代码或数据被pickle[…]

很明显,方法不是在模块的顶层定义的函数,因此不能对其进行pickle。(请仔细阅读文档的这一部分,以避免pickle将来出现问题!)但是,用全局函数替换该方法并将self作为附加参数传递是非常简单的:

import itertools as it

def global_fire(argument):
    self, obj = argument
    self.fire(obj)

class Collectorparallel():
    def fire(self,obj):
        collectorController = Collectorcontroller()
        collectorController.crawlTask(obj)
    def start(self):
        log_to_stderr(logging.DEBUG)
        pluginObjectList = []
        for pluginName in self.settingModel.getAllCollectorName():
            name = pluginName.capitalize()
            #Get plugin class and instanitiate object
            module = __import__('plugins.'+pluginName,fromlist=[name])
            pluginClass = getattr(module,name)
            pluginObject = pluginClass()
            pluginObjectList.append(pluginObject)

        pool = Pool(8)
        jobs = pool.map(global_fire, zip(it.repeat(self), pluginObjectList))
        pool.close()
        print pluginObjectList

请注意,由于Pool.map只使用一个参数调用给定的函数,因此我们必须将self和实际参数"打包"在一起。为了做到这一点,我有zip ped it.repeat(self)和原始的可迭代。

如果您不关心调用的顺序,那么使用pool.imap_unordered可能会提供更好的性能。然而,它返回的是可迭代的,而不是列表,所以如果您想要结果列表,则必须执行jobs = list(pool.imap_unordered(...))

我相信这段代码将消除所有酸洗问题。

class Collectorparallel():
def __call__(self,cNames):
    for pluginName in cNames:
        name = pluginName.capitalize()
        #Get plugin class and instanitiate object
        module = __import__('plugins.'+pluginName,fromlist=[name])
        pluginClass = getattr(module,name)
        pluginObject = pluginClass()
        pluginObjectList.append(pluginObject)
    collectorController = Collectorcontroller()
    collectorController.crawlTask(obj)
def start(self):
    log_to_stderr(logging.DEBUG)
    pool = Pool(8)
    jobs = pool.map(self,self.settingModel.getAllCollectorName())
    pool.close()

这里发生的事情是Collectorparallel已经变成了可调用的。插件名称列表用作池的可迭代项,插件的实际确定及其实例化在每个工作进程中完成,类实例对象用作每个工作进程的可调用项。

最新更新