多处理线程池返回Null结果



元组列表是巨大数据集的子集。我一直在尝试使用多线程来减少计算时间。但是dfsi列表给出了null结果?

dfsi = list[]
tup = [(28075,69),(28075,72),(28075,73),(28075,76),(28075,96),(28075,99), 
(28075,102),(28075,103),(28075,162),(28075,165)]
from multiprocessing.pool import ThreadPool 
def multi_processing_tuples(sku,ids):
Q0 = np.percentile(df[((df['sku'] == sku) & (df['ids'] == ids)), 0)
Q4 = np.percentile(df[((df['sku'] == sku) & (df['ids'] == ids))], 100)
dfsi.append((sku,ids,Q0,Q4))
pool_size = 5
pool = ThreadPool(pool_size)
for (sku,ids) in tup:
pool.apply_async(multi_processing_tuples, ((sku,ids),))
pool.close()
pool.join()

编辑:

dfsi = list[]
tup = [(28075,69),(28075,72),(28075,73),(28075,76),(28075,96),(28075,99), 
(28075,102),(28075,103),(28075,162),(28075,165)]
from multiprocessing.pool import ThreadPool 
def multi_processing_tuples(sku,ids):
Q0 = np.percentile(df[((df['sku'] == sku) & (df['ids'] == ids)), 0)
Q4 = np.percentile(df[((df['sku'] == sku) & (df['ids'] == ids))], 100)
return(sku,ids,Q0,Q4)
pool_size = 5
pool = ThreadPool(pool_size)
for (sku,ids) in tup:
dfsi.append(pool.apply_async(multi_processing_tuples, ((sku,ids),)))
pool.close()
pool.join()

我正在将dfsi输出为.

[<multiprocessing.pool.ApplyResult at 0x1f707d7d9b0>,
<multiprocessing.pool.ApplyResult at 0x1f707d7d748>,
<multiprocessing.pool.ApplyResult at 0x1f707d7d710>,
<multiprocessing.pool.ApplyResult at 0x1f707d7dda0>,
<multiprocessing.pool.ApplyResult at 0x1f707d8e0f0>,
<multiprocessing.pool.ApplyResult at 0x1f707d8e358>,
<multiprocessing.pool.ApplyResult at 0x1f707d8e320>,
<multiprocessing.pool.ApplyResult at 0x1f707d8e6a0>,
<multiprocessing.pool.ApplyResult at 0x1f707d936d8>,
<multiprocessing.pool.ApplyResult at 0x1f707d93eb8>]

我如何才能看到实际输出?

当您生成一个新线程时,您共享原始进程中的数据,但当您试图更改数据时,它会被复制。当您关闭该线程时,您不会隐式地将任何内容复制回来。您需要明确地返回结果,然后在父级中进行处理。

def multi_processing_tuples(skid):
sku,ids = skid
Q0 = np.percentile(df[((df['sku'] == sku) & (df['ids'] == ids)), 0)
Q4 = np.percentile(df[((df['sku'] == sku) & (df['ids'] == ids))], 100)
return (sku,ids,Q0,Q4)
for data in pool.imap(multi_processing_tuples,tup):
dfsi.append(data)

这样做将从multi_processing_tuples返回数据,但您可能还应该将df作为参数传入。

编辑:此外,您通常不应该为此使用线程;如果您试图改善CPU密集型进程的运行时,则应该使用进程池。线程化将有助于处理IO密集型进程。

最新更新