Multiprocess.pool.map()引发ValueError:没有要连接的对象



我必须运行一个for循环,每个循环都将访问数据库中的数据,进行一些操作并运行Dijkstra算法,然后将结果附加到最终列表中。代码如下:

def log_transform(x):
transformed = math.e**(-x)
return transformed 
input_region = '1.199'
t1 = '20200101'
t2 = '20200115' 
candid_sale_invoices = pd.read_excel('candid_sale_invoices.xlsx')
candid_barcodes = pd.read_excel('candid_barcodes.xlsx')
weights = []   
for i in range(int(t1),(int(t2) + 1)):
input_date = str(i)
sql_data = """select trim(cast(p.Barcode as nvarchar(20))) Barcode ,cast(s.invoiceid as nvarchar(20)) invoiceid
from sales s inner join Product_981115 p on s.productid = p.productid 
where s.date = """+ input_date +""" and s.qty != 0 and p.sectionid != 1691.199 and s.RegionID = """ + input_region
data = [] 
for chunk in pd.read_sql(sql_data,conn,chunksize = 1000000):
data.append(chunk)
data = pd.concat(data, ignore_index = True)
data = data.merge(candid_sale_invoices)
data = data.merge(candid_barcodes)
final_edges_df = data.iloc[:,[2,3,4]]
final_edges_tuples = [tuple(x) for x in final_edges_df.values]
Gm = ig.Graph.TupleList(final_edges_tuples, directed = True, edge_attrs = ['weight'])
longest_paths = pd.DataFrame(Gm.shortest_paths_dijkstra(None,None, weights = 'weight'))
longest_paths = longest_paths.swifter.apply(log_transform)
longest_paths["Date"] = input_date
longest_paths["RegionID"] = input_region
weights.append(longest_paths)
weights = pd.concat(weights, ignore_index = True)

问题在于处理时间,需要几个小时才能完成。因此,由于每次迭代都独立于其他迭代,我决定在这个链接的帮助下并行运行这个循环

import psutil
from multiprocess import Pool
pool = Pool(psutil.cpu_count(logical=False))
def graph_analysis(i):
input_date = str(i)
sql_data = """select trim(cast(p.Barcode as nvarchar(20))) Barcode ,cast(s.invoiceid as 
nvarchar(20)) invoiceid
from sales s inner join Product_981115 p on s.productid = p.productid 
where s.date = """+ input_date +""" and s.qty != 0 and p.sectionid != 1691.199 and s.RegionID = """ + input_region
data = [] 
for chunk in pd.read_sql(sql_data,conn,chunksize = 1000000):
data.append(chunk)
data = pd.concat(data, ignore_index = True)
data = data.merge(candid_sale_invoices)
data = data.merge(candid_barcodes)
final_edges_df = data.iloc[:,[2,3,4]]
final_edges_tuples = [tuple(x) for x in final_edges_df.values]
Gm = ig.Graph.TupleList(final_edges_tuples, directed = True, edge_attrs = ['weight'])
longest_paths = pd.DataFrame(Gm.shortest_paths_dijkstra(None,None, weights = 'weight'))
longest_paths = longest_paths.swifter.apply(log_transform)
longest_paths["Date"] = input_date
longest_paths["RegionID"] = input_region
Return longest_paths
results = pool.map(graph_analysis,range(int(t1),(int(t2) + 1)))
pool.close()

在运行代码时,代码似乎在并行地完成其工作和计算,但过了一段时间后,它会出现以下错误:

Traceback(最后一次调用(:

文件",第78行,在weights=pool.map(graph_analysis,range(int(t1(,(int(t2(+1((

文件"C:\Users\AppData\Local\Continum\anaconda3\lib\site packages\multiprocess\pool.py",268号线,在地图上回归自我_map_async(func,iterable,mapstar,chunksize(.get((

文件"C:\Users\AppData\Local\Continum\anaconda3\lib\site packages\multiprocess\pool.py",657线,在get提升自我_值

ValueError:没有要连接的对象

此错误是否与从所有迭代中收集"longest_path"数据帧有关?

"No objects to concatenate"是一个Pandas错误,当您使用空的可迭代项调用pd.concat()时返回:

>>> pd.concat([])
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "pandas/core/reshape/concat.py", line 281, in concat
sort=sort,
File "pandas/core/reshape/concat.py", line 329, in __init__
raise ValueError("No objects to concatenate")
ValueError: No objects to concatenate
>>>

我认为,当SQL查询没有返回任何内容时,您只需要提前退出:

# ...
for chunk in pd.read_sql(sql_data, conn, chunksize=1000000):
data.append(chunk)
if not data: # <-- add this bit!
return None
data = pd.concat(data, ignore_index=True)
# ...

我还建议使用pool.imap_unordered();您也不需要psutil,因为Pool无论如何都会默认为CPU的数量。

总之是这样的——注意,我将返回类型更改为(i, x),这样您也可以返回传递的索引。当然,这仍然缺少在任何地方保存实际结果。:(

from multiprocess import Pool

def log_transform(x):
transformed = math.e ** (-x)
return transformed

input_region = "1.199"
t1 = "20200101"
t2 = "20200115"
candid_sale_invoices = pd.read_excel("candid_sale_invoices.xlsx")
candid_barcodes = pd.read_excel("candid_barcodes.xlsx")

def graph_analysis(i):
input_date = str(i)
sql_data = (
"""select trim(cast(p.Barcode as nvarchar(20))) Barcode ,cast(s.invoiceid as 
nvarchar(20)) invoiceid
from sales s inner join Product_981115 p on s.productid = p.productid 
where s.date = """
+ input_date
+ """ and s.qty != 0 and p.sectionid != 1691.199 and s.RegionID = """
+ input_region
)
data = []
for chunk in pd.read_sql(sql_data, conn, chunksize=1000000):
data.append(chunk)
if not data:
return (i, None)
data = pd.concat(data, ignore_index=True)
data = data.merge(candid_sale_invoices)
data = data.merge(candid_barcodes)
final_edges_df = data.iloc[:, [2, 3, 4]]
final_edges_tuples = [tuple(x) for x in final_edges_df.values]
Gm = ig.Graph.TupleList(final_edges_tuples, directed=True, edge_attrs=["weight"])
longest_paths = pd.DataFrame(Gm.shortest_paths_dijkstra(None, None, weights="weight"))
longest_paths = longest_paths.swifter.apply(log_transform)
longest_paths["Date"] = input_date
longest_paths["RegionID"] = input_region
return (i, longest_paths)

if __name__ == "__main__":
with Pool() as pool:
for i, result in pool.imap_unordered(graph_analysis, range(int(t1), (int(t2) + 1))):
print(i, result)

相关内容

  • 没有找到相关文章