如何在 dask.dataframe 中的多列中分解字典(或字典列表)对象



当我尝试使用 xmltodict 将某些 xml 转换为数据帧时,碰巧特定列包含我需要的所有信息作为字典或字典列表。我可以使用熊猫将此列转换为多个列,但我无法在 dask 中执行类似的操作。

无法使用meta,因为我不知道xml中可用的所有可能字段,并且dask是必需的,因为真正的xml文件每个都大于1Gb。

示例.xml:

<?xml version="1.0" encoding="UTF-8"?>
<itemList>
<eventItem uid="1">
<timestamp>2019-07-04T09:57:35.044Z</timestamp>
<eventType>generic</eventType>
<details>
<detail>
<name>columnA</name>
<value>AAA</value>
</detail>
<detail>
<name>columnB</name>
<value>BBB</value>
</detail>
</details>
</eventItem>
<eventItem uid="2">
<timestamp>2019-07-04T09:57:52.188Z</timestamp>
<eventType>generic</eventType>
<details>
<detail>
<name>columnC</name>
<value>CCC</value>
</detail>
</details>
</eventItem>
</itemList>

工作熊猫代码:

import xmltodict
import collections
import pandas as pd
def pd_output_dict(details):
detail = details.get("detail", [])
ret_value = {}
if type(detail) in (collections.OrderedDict, dict):
ret_value[detail["name"]] = detail["value"]
elif type(detail) == list:
for i in detail:
ret_value[i["name"]] = i["value"]
return pd.Series(ret_value)
with open("example.xml", "r", encoding="utf8") as f:
df_dict_list = xmltodict.parse(f.read()).get("itemList", {}).get("eventItem", [])
df = pd.DataFrame(df_dict_list)
df = pd.concat([df, df.apply(lambda row: pd_output_dict(row.details), axis=1, result_type="expand")], axis=1)
print(df.head())

不工作 dask 代码:

import xmltodict
import collections
import dask
import dask.bag as db
import dask.dataframe as dd
def dd_output_dict(row):
detail = row.get("details", {}).get("detail", [])
ret_value = {}
if type(detail) in (collections.OrderedDict, dict):
row[detail["name"]] = detail["value"]
elif type(detail) == list:
for i in detail:
row[i["name"]] = i["value"]
return row
with open("example.xml", "r", encoding="utf8") as f:
df_dict_list = xmltodict.parse(f.read()).get("itemList", {}).get("eventItem", [])
df_bag = db.from_sequence(df_dict_list)
df = df_bag.to_dataframe()
df = df.apply(lambda row: dd_output_dict(row), axis=1)

这个想法是在 dask 中得到与我在熊猫中类似的结果,但在我收到错误的那一刻:

>>> df = df.apply(lambda row: output_dict(row), axis=1)
Traceback (most recent call last):
File "C:Anaconda3libsite-packagesdaskdataframeutils.py", line 169, in raise_on_meta_error
yield
File "C:Anaconda3libsite-packagesdaskdataframecore.py", line 4711, in _emulate
return func(*_extract_meta(args, True), **_extract_meta(kwargs, True))
File "C:Anaconda3libsite-packagesdaskutils.py", line 854, in __call__
return getattr(obj, self.method)(*args, **kwargs)
File "C:Anaconda3libsite-packagespandascoreframe.py", line 6487, in apply
return op.get_result()
File "C:Anaconda3libsite-packagespandascoreapply.py", line 151, in get_result
return self.apply_standard()
File "C:Anaconda3libsite-packagespandascoreapply.py", line 257, in apply_standard
self.apply_series_generator()
File "C:Anaconda3libsite-packagespandascoreapply.py", line 286, in apply_series_generator
results[i] = self.f(v)
File "<stdin>", line 1, in <lambda>
File "<stdin>", line 4, in output_dict
AttributeError: ("'str' object has no attribute 'get'", 'occurred at index 0')
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "C:Anaconda3libsite-packagesdaskdataframecore.py", line 3964, in apply
M.apply, self._meta_nonempty, func, args=args, udf=True, **kwds
File "C:Anaconda3libsite-packagesdaskdataframecore.py", line 4711, in _emulate
return func(*_extract_meta(args, True), **_extract_meta(kwargs, True))
File "C:Anaconda3libcontextlib.py", line 130, in __exit__
self.gen.throw(type, value, traceback)
File "C:Anaconda3libsite-packagesdaskdataframeutils.py", line 190, in raise_on_meta_error
raise ValueError(msg)
ValueError: Metadata inference failed in `apply`.
You have supplied a custom function and Dask is unable to
determine the type of output that that function returns.
To resolve this please provide a meta= keyword.
The docstring of the Dask function you ran should have more information.
Original error is below:
------------------------
AttributeError("'str' object has no attribute 'get'", 'occurred at index 0')
Traceback:
---------
File "C:Anaconda3libsite-packagesdaskdataframeutils.py", line 169, in raise_on_meta_error
yield
File "C:Anaconda3libsite-packagesdaskdataframecore.py", line 4711, in _emulate
return func(*_extract_meta(args, True), **_extract_meta(kwargs, True))
File "C:Anaconda3libsite-packagesdaskutils.py", line 854, in __call__
return getattr(obj, self.method)(*args, **kwargs)
File "C:Anaconda3libsite-packagespandascoreframe.py", line 6487, in apply
return op.get_result()
File "C:Anaconda3libsite-packagespandascoreapply.py", line 151, in get_result
return self.apply_standard()
File "C:Anaconda3libsite-packagespandascoreapply.py", line 257, in apply_standard
self.apply_series_generator()
File "C:Anaconda3libsite-packagespandascoreapply.py", line 286, in apply_series_generator
results[i] = self.f(v)
File "<stdin>", line 1, in <lambda>
File "<stdin>", line 4, in output_dict

对,所以像map_partitions这样的操作需要知道列名和数据类型。 如前所述,您可以使用meta=关键字指定这一点。

也许你可以运行一次数据来计算这些是什么,然后构造一个适当的元对象,并将其传入? 这是低效的,需要通读你的所有数据,但我不确定是否有另一种方法。

最新更新