正在评估GroupBy计算的群集大小



免责声明第一:这里的结果和数字绝不是比较任何商业或非商业产品的基准

目前,我的团队正在Azure上使用Databricks执行数据工程任务,作为PoC,我们决定尝试Dask框架。因此,我们创建了一个与DBS上使用的大小几乎相同的专用k8s集群,并在那里部署了Dask——总工作RAM为512 GiB,84个工作人员(每个CPU一个工作人员(/8个CPU和32 GiB RAM用于调度器,并尝试在测试数据的子集上复制我们的一个数据流。

流程如下-我们从第三方收到镶木地板格式的设备性能日志。这些是独立的文件,不像一个单独的logs.parquet实体。在我们这边,我们收到一个csv文件,其中包含每月创建的所有prq文件路径。

我们的目标是将这些文件读取到DataFrame中,并相应地聚合它们——每个设备每班以1秒的速率(原始日志由纳秒速率的时间戳组成(。本质上,这是3列,以唯一的方式标识每一行。更准确地说,每个设备的每个移位都代替了上面提到的单个prq文件。

我非常依赖Coiled博客的内存使用报告

import numpy
def partition_report(ddf):
series = ddf.memory_usage_per_partition(deep=True).compute()
total = series.count()
print(f"Total number of partitions: {total}")
total_memory = format_bytes(series.sum())
print(f"Total DataFrame memory: {total_memory}")
total = total.astype(numpy.float64)
lt_1kb = series.where(lambda x : x < 1000).count()
lt_1kb_percentage = '{:.1%}'.format(lt_1kb/total)
lt_1mb = series.where(lambda x : x < 1000000).count()
lt_1mb_percentage = '{:.1%}'.format(lt_1mb/total)
gt_1gb = series.where(lambda x : x > 1000000000).count()
gt_1gb_percentage = '{:.1%}'.format(gt_1gb/total)
print(f"Num partitions < 1 KB: {lt_1kb} ({lt_1kb_percentage})")
print(f"Num partitions < 1 MB: {lt_1mb} ({lt_1mb_percentage})")
print(f"Num partitions > 1 GB: {gt_1gb} ({gt_1gb_percentage})")

分区函数

其在读取输入文件后表示数据帧由45k个分区组成,大小约为1,3 TiB,总行数约为1763000 000。当处理这些数据的子集时,明显的groupby工作得很好,但一旦我尝试处理大约1%或更多的数据,就会不断出现KilledWorker错误。土星云提供了一些关于如何利用split_out和split_every参数的输入,所以我决定将split_every设置为尽可能小的值,并根据分区的初始数量设置split_out。仍然会出现此错误group通过故障排除

df = pd.read_parquet( path= full_prq_paths             # a list of input prq files
, storage_options=storage_options  # credentials
#, gather_statistics=True
)
df_1s = df.groupby([df.device_id
, df.time_a_date
]
).agg(agg_columns                   # dictionary of columns and agg. functions
, split_out = split_amount    # split_amount = df.npartitions // 2
, split_every = 2

)

经过一些调查,并假设每个prq文件,即分区,代表单个泵的唯一移位,我决定首先使用map_partions,以具有较小的数据帧先验分组by:

df_int_1s = df.map_partitions(lambda part: part.groupby([part.time_a_date
])
.agg(agg_columns))

这是有效的,partition_report表示中间数据帧的大小为692.21 GiB,行数为936 000 000。然而,与最初的DBS结果相比,我少了1000行(可能在几个prq文件中出现了同一秒内的时间戳(。恢复到groupby并没有帮助——内存不足。

df_1s = df.groupby([df.description
, df.time_a_date
]
).agg(agg_columns
, split_out = split_amount
, split_every = 2

)

在所有这些测试中,我看到了大量数据泄露(仪表板上有黄色和灰色(。

我的问题是,应对这种群体性的下一步可能是什么?配置具有较少工作人员但每个工作人员具有更多CPU的集群更好吗?重新划分初始数据帧有时也会导致KilledWorker错误。此外,没有一个prq文件的大小超过300Mb,因此不会发生太多数据偏斜。突然增加集群大小就像是我们的最后手段,但我们需要在多大程度上增加RAM?

.groupby可能会有问题,假设数据在分区之间适当地分布,那么使用.map_partitions是可行的。

最初的DBS结果我有点偏离了1000行(可能在同一秒内的时间戳出现在几个prq文件中(。

这会促使对数据进行调查,可能在轮班开始/结束时(或其他一些重要时间,如午夜(存在问题。

最新更新