通过 json.load with multiprocessing或 Dask in Python 减少内存使用量



我有一个 csv 文件,有 100 万行和 3GB 数据大小。我使用熊猫read_csv将其转换为数据帧,效果很好。

接下来,我现在必须格式化数据列,并根据某些列的值附加另一列。为此,我正在使用Dask DataFrame npartitions,然后按行应用。我们的实例有 7.5gb 的 RAM,但它挂起并因 MemoryError 终止进程。

这是我格式化数据列的代码:

import pandas as pd
import json
import dask.dataframe as dd
import multiprocessing
def formatting_data(data):
print("cleaning and formatting data")
data["IsBadJson"] = False
data["BadJsonStr"] = None
data = dd.from_pandas(data, npartitions=4*multiprocessing.cpu_count())
.map_partitions(lambda df: df.apply(lambda row: parse_data_to_json(row), axis=1))
.compute(scheduler='processes')
return data

以下是我们用于格式化的函数parse_data_to_json的代码

def parse_data_to_json(x):
try:
if x.get("RequestSent") == "nan":
x["RequestSent"] = None
x["IsBadJson"] = True
x["BadJsonStr"] = str(x.get("RequestSent"))
else:
x["RequestSent"] = json.loads(x.get("RequestSent"))
x["IsBadJson"] = False
x["BadJsonStr"] = None
except Exception as error:
print("Found an error value in Tax Json field RequestSent: {}, error details: {}".format(x.get("RequestSent"), error))
print("{}-{}-{}".format(None, True, str(x.get("RequestSent"))))
x["RequestSent"] = None
x["IsBadJson"] = True
x["BadJsonStr"] = str(x.get("RequestSent"))

try:
if x.get("ResponseReceived") == "nan":
x["ResponseReceived"] = None
x["IsBadJson"] = True
x["BadJsonStr"] = str(x.get("ResponseReceived"))
else:
x["ResponseReceived"] = json.loads(x.get("ResponseReceived"))
x["IsBadJson"] = False
x["BadJsonStr"] = None
except Exception as error:
print("Found an error value in Tax Json field RequestSent: {}, error details: {}".format(x.get("ResponseReceived"), error))
print("{}-{}-{}".format(None, True, str(x.get("ResponseReceived"))))
x["ResponseReceived"] = None
x["IsBadJson"] = True
x["BadJsonStr"] = str(x.get("ResponseReceived"))
return x

我建议允许 Dask 直接从 CSV 加载数据,而不是将 Pandas 数据帧传递给它。 见 https://docs.dask.org/en/latest/best-practices.html#load-data-with-dask

最新更新