Python-CSV文件与数据流模板的Dict



我正在尝试使用Dataflow模板和Python将CSV文件处理为dict。

由于这是一个模板,我必须使用textio模块中的ReadFromText,才能在运行时提供路径。

| beam.io.ReadFromText(contact_options.path)

我所需要的就是能够提取这个text.csv文件的第一行,然后我可以在DictReader中使用这些数据作为字段名。

如果我使用拆分行,它会带回列表中文本文件的每个元素:

return element.splitlines()

csv_data = []
split_element = element.split('n')
for row in split_element:
csv_data.append(row)
return csv_data
['phone_number', 'cid', 'first_name', 'last_name']
['          ', '101XXXXX', 'MurXXX', 'LevXXXX']
['3052XXXXX', '109XXXXX', 'MerXXXX', 'CoXXXX']
['954XXXXX', '10XXXXXX', 'RoXXXX', 'MaXXXXX']

尽管如果我使用say元素[0],它只是在没有列表括号的情况下将所有内容都带回来。我还尝试过用"\n"进行拆分,然后使用for循环生成列表对象,尽管它生成的结果几乎相同。

我不能依赖于使用预先确定的字段名,因为要处理的csv文件都会有不同的字段名。如果没有给定的字段名称,DictReader将无法有效工作。

编辑:

预期输出为:

[{'phone_Number': '561XXXXX', 'first_Name': '', 'last_Name': 'BeXXXX', 'cid': '745XXXXX'}, {'phone_Number': '561XXXXX', 'first_Name': 'A', 'last_Name': 'BXXXX', 'cid': '61XXXXX'}]

编辑:

元素内容:

"phone_Number","cid","first_Name","last_Name"
"5616XXXXX","745XXXX","","BeXXXXX"
"561XXXXXX","61XXXXX","A","BXXXXXXt"
"95XXXXXXX","6XXXXXX","A","BXXXXXX"
"727XXXXXX","98XXXXXX","A","CaXXXXXX"

使用Pandas加载值,并使用第一行作为colheaders

import pandas as pd
a_big_list=[['phone_number', 'cid', 'first_name', 'last_name'],
['          ', '101XXXXX', 'MurXXX', 'LevXXXX'],
['3052XXXXX', '109XXXXX', 'MerXXXX', 'CoXXXX'],
['954XXXXX', '10XXXXXX', 'RoXXXX', 'MaXXXXX']]
df=pd.DataFrame(a_big_list[1:],columns=a_big_list[0])
df.to_dict('records')
#[{'cid': '101XXXXX',
'first_name': 'MurXXX',
'last_name': 'LevXXXX',
'phone_number': '          '},
{'cid': '109XXXXX',
'first_name': 'MerXXXX',
'last_name': 'CoXXXX',
'phone_number': '3052XXXXX'},
{'cid': '10XXXXXX',
'first_name': 'RoXXXX',
'last_name': 'MaXXXXX',
'phone_number': '954XXXXX'}]

我从@mad_的答案中得到了灵感,从而解决了这个问题,但这仍然没有给我最初的正确答案,因为我需要首先将我的pcollection分组为一个元素。我从马佳缘的回答中找到了一种方法,并将其稍作修改:

class Group(beam.DoFn):
def __init__(self):
self._buffer = []
def process(self, element):
self._buffer.append(element)
def finish_bundle(self):
if len(self._buffer) != 0:
yield list(self._buffer)
self._buffer = []
lines = p | 'File reading' >> ReadFromText(known_args.input)
| 'Group' >> beam.ParDo(Group(known_args.N)
...

因此,它将整个CSV文件分组为一个对象,然后我能够应用mad_的方法将其转换为字典。

最新更新