我有8个CPU核心和200个任务要做。每个任务都是孤立的。没有必要等待或分享结果。我正在寻找一种一次运行8个任务/进程的方法(最大值),并在其中一个完成时运行。剩下的任务将自动启动进程。
如何知道子进程何时完成并启动新的子进程。首先,我尝试使用进程(多处理),但很难弄清楚。然后我尝试使用池,并面临pickle问题,因为我需要使用动态实例化。
编辑:添加我的池代码
class Collectorparallel():
def fire(self,obj):
collectorController = Collectorcontroller()
collectorController.crawlTask(obj)
def start(self):
log_to_stderr(logging.DEBUG)
pluginObjectList = []
for pluginName in self.settingModel.getAllCollectorName():
name = pluginName.capitalize()
#Get plugin class and instanitiate object
module = __import__('plugins.'+pluginName,fromlist=[name])
pluginClass = getattr(module,name)
pluginObject = pluginClass()
pluginObjectList.append(pluginObject)
pool = Pool(8)
jobs = pool.map(self.fire,pluginObjectList)
pool.close()
print pluginObjectList
pluginObjectList得到了类似的东西
[<plugins.name1.Name1 instance at 0x1f54290>, <plugins.name2.Name2 instance at 0x1f54f38>]
PicklingError:无法pickle:属性查找内置。instancemethod失败
但是Process版本运行良好
警告这对部署和情况来说有点主观,但我目前的设置如下
我有一个工人程序,我启动了6个副本(我有6个核心)。每个工人都要做以下工作;
- 连接到Redis实例
- 尝试弹出特定列表中的一些工作
- 推送日志记录信息
- 空闲或因"队列"中缺少工作而终止
然后,每个程序基本上都是独立的,同时仍然使用单独的排队系统来完成所需的工作。由于您在流程上没有中间人,因此这可能是您问题的解决方案。
我不是Python中多处理的专家,但我在这个帮助下尝试了一些新的东西http://www.tutorialspoint.com/python/python_multithreading.htm还有这个http://www.devshed.com/c/a/Python/Basic-Threading-in-Python/1/。
例如,您可以使用此方法isAlive
来回答您的问题。
问题的解决方案很简单。首先,请注意方法不能腌制。事实上,只有pickle
的文档中列出的类型可以进行pickle:
None
、True
和False
- 整数、长整数、浮点数、复数
- 普通字符串和Unicode字符串
- 仅包含可拾取对象的
tuple
、list
、set
和dict
系列- 在模块顶层定义的函数
- 模块顶层定义的内置函数
- 在模块顶层定义的类
__dict__
或调用__getstate__()
的结果可拾取的此类实例(请参阅pickle协议一节详细信息)[…]
请注意,函数(内置和用户定义)由"完全限定"名称引用,不按值。这意味着只有函数名称以及函数在中定义的模块名称被pickle。既不是函数的代码,也不是其功能属性被腌制。因此定义模块必须可在取消拾取环境中导入,并且模块必须包含命名对象,否则将引发异常。[4]
类似地,类通过命名引用进行pickle,因此取消拾取环境中的限制适用。请注意,类的代码或数据被pickle[…]
很明显,方法不是在模块的顶层定义的函数,因此不能对其进行pickle。(请仔细阅读文档的这一部分,以避免pickle将来出现问题!)但是,用全局函数替换该方法并将self
作为附加参数传递是非常简单的:
import itertools as it
def global_fire(argument):
self, obj = argument
self.fire(obj)
class Collectorparallel():
def fire(self,obj):
collectorController = Collectorcontroller()
collectorController.crawlTask(obj)
def start(self):
log_to_stderr(logging.DEBUG)
pluginObjectList = []
for pluginName in self.settingModel.getAllCollectorName():
name = pluginName.capitalize()
#Get plugin class and instanitiate object
module = __import__('plugins.'+pluginName,fromlist=[name])
pluginClass = getattr(module,name)
pluginObject = pluginClass()
pluginObjectList.append(pluginObject)
pool = Pool(8)
jobs = pool.map(global_fire, zip(it.repeat(self), pluginObjectList))
pool.close()
print pluginObjectList
请注意,由于Pool.map
只使用一个参数调用给定的函数,因此我们必须将self
和实际参数"打包"在一起。为了做到这一点,我有zip
ped it.repeat(self)
和原始的可迭代。
如果您不关心调用的顺序,那么使用pool.imap_unordered
可能会提供更好的性能。然而,它返回的是可迭代的,而不是列表,所以如果您想要结果列表,则必须执行jobs = list(pool.imap_unordered(...))
。
我相信这段代码将消除所有酸洗问题。
class Collectorparallel():
def __call__(self,cNames):
for pluginName in cNames:
name = pluginName.capitalize()
#Get plugin class and instanitiate object
module = __import__('plugins.'+pluginName,fromlist=[name])
pluginClass = getattr(module,name)
pluginObject = pluginClass()
pluginObjectList.append(pluginObject)
collectorController = Collectorcontroller()
collectorController.crawlTask(obj)
def start(self):
log_to_stderr(logging.DEBUG)
pool = Pool(8)
jobs = pool.map(self,self.settingModel.getAllCollectorName())
pool.close()
这里发生的事情是Collectorparallel
已经变成了可调用的。插件名称列表用作池的可迭代项,插件的实际确定及其实例化在每个工作进程中完成,类实例对象用作每个工作进程的可调用项。