在对 Dask 包中的 JSON 记录进行重复数据删除时,是否有比 .distinct() 更好的选择 - 一个减少内存



我一直在尝试处理几GB的文本文件,每行都包含JSON记录。在处理某些文件时,我发现了一些重复的记录。每条记录上都有唯一的 ID(键名 UID(,因此识别重复应该很容易。

我尝试将所有JSON导入Dask包,过滤到我感兴趣的键,然后运行.distinct((。链如下所示:

def get_keys(record):
return (r['UID'],r['DATETIME'],r['USER'],r['PAGEID'],r['ACTION'])
items = (db.read_text('/*.json')
.map(json.loads)
.map(get_keys)
.distinct()
.to_dataframe(columns=['UID','TIMESTAMP','USER_ID','PID','A'])
.compute())
items.to_csv('deduped_items.csv')

我在笔记本电脑上运行它,所以是本地客户端。最终发生的事情是工作线程在达到 95% 的内存使用率时重新启动。在引发异常之前,它似乎重试了几次。在观察 Dask 仪表板时,我看到所有来自 bag-from-delay 的分区都得到了处理,大部分不同部分都得到了处理,然后它无法通过 distinct-agg。

上面的代码块应该工作,还是我总是遇到内存限制

--编辑

在了解了不同的关键参数后,我正在重新运行它。如果有帮助,将使用结果进行更新。

.distinct(key=lambda x:x[0])

-- 编辑 2

不同的 with 键也耗尽了内存。我现在要重新排列到这个:

.map(get_keys)
.to_dataframe(columns=['UID','TIMESTAMP','USER_ID','PID','A'])
.drop_duplicates()
.compute())

它现在正在运行,任务图看起来相同,但它似乎运行得更快。

-- 编辑 3

这也无济于事。我开始相信,相对于总记录数的重复数使这变得困难。粗略估计,大约有 5000 万条记录和猜测不到 10k 重复。

-- 编辑 4

我现在正在尝试将 UID 设置为索引并执行map_partitions。我的理解是,重复的 UID 将保证存在于同一个分区中?

items = (db.read_text('/*.json')
.map(json.loads)
.map(get_keys)
.to_dataframe(columns=['UID','TIMESTAMP','USER_ID','PID','A'])
.set_index('UID')
.map_partitions(lambda x: x.drop_duplicates)
.compute())

我相信dask.bag中的独特调用是相当幼稚的。 它获取每个分区的唯一元素,然后将这些元素合并到单个分区中,并获取这些元素的不同元素。 仅当您希望结果适合内存时,才合理使用。

最新更新