我创建了一个方法,该方法从一个网页下载产品,然后将该产品存储到数据库SQLite3
中。
这个函数在正常调用时效果很好,但我想创建一个池并并行执行(因为发送并行请求)(网页允许机器人发送2000个请求/分钟)。
问题是,当我试图将它放入池时,它不会将数据存储到数据库中,也不会引发一些error or exception
。
if __name__ == '__main__':
pu = product_updater() # class which handles almost all, this class also has database manager class as an attribute
pool = Pool(10)
for line in lines[0:100]: # list lines is a list of urls
# pu.update_product(line[:-1]) # this works correctly
pool.apply_async(pu.update_product, args=(line[:-1],)) # this works correctly but does not store products into the database
pool.close()
pool.join()
def update_product(self,url): # This method belongs to product_updater class
prod = self.parse_product(url)
self.man.insert_product(prod) # man is a class to handling database
我使用这个池:from multiprocessing.pool import ThreadPool as Pool
你知道哪里出了问题吗?
编辑:我认为这可能是由于只有一个游标在工作人员之间共享,但我认为如果这是一个问题,它会引起一些Exception
。
EDIT2:奇怪的是,我试图使池只有1个工人,所以不应该有并发性的问题,但相同的结果-在数据库中没有新的行
multiprocessing。只要你不向任务请求确认,Pool就不会通知worker中发生的异常。
这个例子将是沉默的。
from multiprocessing import Pool
def function():
raise Exception("BOOM!")
p = Pool()
p.apply_async(function)
p.close()
p.join()
这个例子将显示异常。
from multiprocessing import Pool
def function():
raise Exception("BOOM!")
p = Pool()
task = p.apply_async(function)
task.get() # <---- you will get the exception here
p.close()
p.join()
问题的根本原因是共享单个游标对象,这不是线程/进程安全的。当多个工作线程在同一个游标上读/写时,事情会被破坏,并且池会静默地吃掉异常(om nom)。
第一个解决方案是确认我所展示的任务,以便使问题可见。然后,您可以做的是为每个worker获得一个专用游标。