按行块处理任务数据框



我有一个使用特定blocksize块创建的任务数据框架:

df = dd.read_csv(filepath, blocksize = blocksize * 1024 * 1024)

我可以把它分成这样的块来处理:

partial_results = []
for partition in df.partitions:
partial = trivial_func(partition[var])
partial_results.append(partial)
result = delayed(sum)(partial_results)

(这里我尝试使用map_partitions,但最终只是使用for循环代替)。在此之前一切都很顺利。

现在,我需要在相同的数据上运行一个函数,但是这个函数需要接收数据帧的一定数量的行(例如rows_per_chunk=60),这是可以实现的吗?对于熊猫,我会这样做:

partial_results = []
for i in range(int(len_df/rows_per_chunk)): # I think ceil would be better if decimal
arg_data = df.iloc[i*rows_per_chunk:(i+1)*rows_per_chunk]
partial = not_so_trivial_func(arg_data)
partial_results.append(partial)
result = sum(partial_results)

有可能用任务做这样的事情吗?我知道,由于懒惰的评估,它是不可能使用iloc,但是否有可能分区数据帧在不同的方式?如果没有,用任务实现这一目标的最有效方法是什么?数据框有数百万行

您可以沿着定义如何跨分区分配索引值(假设唯一索引)的分区重新划分数据框。

import dask.dataframe as dd
import pandas as pd
df = pd.DataFrame(range(15), columns=['x'])
ddf = dd.from_pandas(df, npartitions=3)
# there will 5 rows per partition
print(ddf.map_partitions(len).compute())
# you can see that ddf is split along these index values
print(ddf.divisions)
# change the divisions to have the desired spacing
new_divisions = (0, 3, 6, 9, 12, 14)
new_ddf = ddf.repartition(divisions=new_divisions)
# now there will be 3 rows per partition
print(new_ddf.map_partitions(len).compute())

如果index是未知的,那么可以创建一个新的索引(假设行不需要排序)并沿着计算的分区重新分区:

import dask.dataframe as dd
import pandas as pd
# save some data into unindexed csv
num_rows = 15
df = pd.DataFrame(range(num_rows), columns=['x'])
df.to_csv('dask_test.csv', index=False)

# read from csv
ddf = dd.read_csv('dask_test.csv', blocksize=10)
# assume that rows are already ordered (so no sorting is needed)
# then can modify the index using the lengths of partitions
cumlens = ddf.map_partitions(len).compute().cumsum()
# since processing will be done on a partition-by-partition basis, save them
# individually
new_partitions = [ddf.partitions[0]]
for npart, partition in enumerate(ddf.partitions[1:].partitions):
partition.index = partition.index + cumlens[npart]
new_partitions.append(partition)
# this is our new ddf
ddf = dd.concat(new_partitions)
#  set divisions based on cumulative lengths
ddf.divisions = tuple([0] + cumlens.tolist())
# change the divisions to have the desired spacing
new_partition_size = 12
max_rows = cumlens.tolist()[-1]
new_divisions = list(range(0, max_rows, new_partition_size))
if new_divisions[-1]<max_rows:
new_divisions.append(max_rows)
new_ddf = ddf.repartition(divisions=new_divisions)
# now there will be desired rows per partition
print(new_ddf.map_partitions(len).compute())

最新更新