多处理器/for循环是随机跳过元素



数据集每对都有数十亿个数据点。我尝试了多处理循环以使它更快。为什么multiprocessing/for循环会跳过Pairs中的一些元素?一旦我再次运行,这将随机跳过其他一些名称,代码结束。


import pandas as pd
import pickle
import time
import concurrent.futures
start = time.perf_counter()
pairs = ['GBPUSD', 'AUDUSD', 'EURUSD', 'EURJPY', 'GBPJPY', 'USDJPY', 'USDCAD', 'EURGBP']

def pickling_joined(p):
df = pd.read_csv(f'C:\Users\Ghosh\Downloads\dataset\data_joined\{p}.csv')
df['LTP'] = (df['Bid'] + df['Ask']) / 2
print(f'n=====>> Converting Date format for {p} ....')
df['Date'] = df['Date'].apply(pd.to_datetime)
print(f'n=====>> Date format converted for {p} ....')
df.set_index('Date', inplace=True)
df = pd.DataFrame(df)
with open(f'C:\Users\Ghosh\Downloads\dataset\data_pickled\{p}.pkl', 'wb') as pickle_file:
pickle.dump(df, pickle_file)
print(f'n=====>> Pickling done for {p} !!!')

if __name__ == '__main__':
with concurrent.futures.ProcessPoolExecutor() as executor:
executor.map(pickling_joined, pairs)
finish = time.perf_counter()
print(f'Finished in {finish - start} seconds')

Python不能很好地处理重文件的线程/多处理器,我建议在这里使用DASK。DASK使用集群,它的工作方式类似于占用较少时间的多处理,此外,您还可以使用多处理来更快地运行它。

def pickling_joined(p):
df = dd.read_csv(f'C:\Users\Ghosh\Downloads\dataset\data_joined\{p}.csv')
df['LTP'] = (df['Bid'] + df['Ask']) / 2
print(f'n=====>> Converting Date format for {p} ....')
df['Date'] = dd.to_datetime(df.Date)
print(f'n=====>> Date format converted for {p} ....')
df = df.set_index('Date', sorted=True)
df = df.compute()
with open(f'C:\Users\Ghosh\Downloads\dataset\data_pickled\{p}.pkl', 'wb') as pickle_file:
pickle.dump(df, pickle_file)
print(f'n=O=O=O=O=O>> Pickling done for {p} !!!')

if __name__ == '__main__':
with concurrent.futures.ProcessPoolExecutor() as executor:
executor.map(pickling_joined, pairs)
finish = time.perf_counter()
print(f'nFinished in {finish - start} seconds')

Java将是此类操作的更好选择,如果DF较大,python将始终跳过步骤。

最新更新