从解析的CSV创建嵌套的字典和列表



我一直在从事一个项目,该项目涉及解析CSV文件,以便按照复杂的模式将所有数据转换为非常具体格式化的JSON。我必须定制这个程序,因为所需的JSON复杂性使现有的转换器失败。我基本上做到了,但我遇到了最后一个障碍:

我有嵌套的字典,偶尔必须在其中有一个列表,这个列表将包含更多的字典。这很好,我已经能够完成,但现在我需要找到一种方法来添加更多的嵌套字典。以下是该概念的简化分解:

CSV将看起来像这样,其中标签前的@表示它是一个列表

x.a, x.b.z, x.b.y, x.@c.z.nest1, x.@c.z.nest2, x.@c.yy, x.d, x.e.z, x.e.y
ab, cd, ef, gh, ij, kl, mn, op, qr

这应该会产生以下JSON

{
"x": {
"a": "ab",
"b": {
"z": "cd",
"y": "ef"
},
"c": [
{
"z": {
"nest1": "gh",
"nest2": "ij"
}
},
{
"yy": "kl"
}
],
"d": "mn",
"e": {
"z": "op",
"y": "qr"
}
}
}

这是一个我无法解决的问题,我目前的代码只能在列表项之后做一个字典,不能再做了。我还需要能够以某种方式在字典列表中执行以下操作:

"c": [
{
"z": {
"nest1": "gh"
},
"zz": {
"nest2": "ij"
}
},
{
"yy": "kl"
}

。以某种方式在列表中的字典中添加多个嵌套字典。这样做的问题在于它们不能按名称引用,所以我不知道如何在CSV格式中指示这样做。

下面是我的代码,它工作到嵌套在列表中的第一个字典:

import json
import pandas as pd
from os.path import exists
# df1 = pd.read_csv("excelTestFacilities.csv", header = 1, sep=",", keep_default_na=False, engine="python")
# df2 = pd.read_csv("excelTestFacilityContacts.csv", header = 1, sep=",", keep_default_na=False, engine="python")
# df = pd.merge(df1, df2, how = 'inner')
df = pd.read_csv("csvTestFile.csv", header = 1, sep=", ", keep_default_na=False, engine="python")

#print(df) # uncomment to see the transformation
json_data = df.to_dict(orient="records")
#print(json_data)

def unflatten_dic(dic):
"""
Unflattens a CSV list into a set of nested dictionaries
"""
ini = {}
for k,v in list(dic.items()):
node = ini
list_bool = False
*parents, key = k.split('.')
for parent in parents:
if parent[0] == '@':
list_bool = True
if list_bool:
for parent in parents:
if parent[0] == '@':
node[parent[1:]] = node = node.get(parent[1:], [])
else: 
node[parent] = node = node.get(parent, {})
node.append({key : v})
else:
for parent in parents:
node[parent] = node = node.get(parent, {})
node[key] = v
return ini

def merge_lists(dic):
"""
Removes duplicates within sets
"""
for k,v in list(dic.items()):
if isinstance(v, dict):
keys = list(v.keys())
vals = list(v.values())
if all(isinstance(l, list) and len(l)==len(vals[0]) for l in vals):
dic[k] = []
val_tuple = set(zip(*vals)) # removing duplicates with set()
for t in val_tuple:
dic[k].append({subkey: t[i] for i, subkey in enumerate(keys)})
else:
merge_lists(v)
elif isinstance(v, list):
dic[k] = list(set(v))   # removing list duplicates
def clean_blanks(value):
"""
Recursively remove all None values from dictionaries and lists, and returns
the result as a new dictionary or list.
"""
if isinstance(value, list):
return [clean_blanks(x) for x in value if x != ""]
elif isinstance(value, dict):
return {
key: clean_blanks(val)
for key, val in value.items()
if val != "" and val != {}
}
else:
return value
def add_to_dict(section_added_to, section_to_add, value, reportNum):
"""
Adds a value to a given spot within a dictionary set.
section_added_to is optional for adding the set to a deeper section such as facility
section_to_add is the name that the new dictionary entry will have
value is the item to be added
reportNum is the number indicating which report to add to, starting at 0
"""
if section_added_to != '':
end_list[reportNum][section_added_to][section_to_add] = value
else:
end_list[reportNum][section_to_add] = value
def read_add_vals(filename_prefix, added_to, section):
for i in range(len(end_list)):
temp_list = []
filename = filename_prefix + str(i+1) + ".csv"
if not exists(filename):
continue;
temp_df = pd.read_csv(filename, header = 1, sep=",", keep_default_na=False, engine="python")
temp_json = temp_df.to_dict(orient="records")
for y in temp_json:
return_ini = unflatten_dic(y)
temp_list.append(return_ini)
add_to_dict(added_to, section, temp_list, i)

global end_list
end_list = []
for x in json_data:
return_ini = unflatten_dic(x)
end_list.append(return_ini)
#read_add_vals('excelTestPermitsFac', 'facility', 'permits');

json_data = clean_blanks(end_list)
final_json = {"year":2021, "version":"2022-02-14", "reports":json_data}
print(json.dumps(final_json, indent=4))

这段代码的某些部分涉及到整体JSON的其他组件,但我主要关心的是如何更改unflat_dic ()以下是我当前更改unflat_dic()的工作代码,尽管它不起作用…

def list_get(list, list_item):
i = 0
for dict in list:
if list_item in dict:
return dict.get(list_item, {})
i += 1
return {}    

def check_in_list(list, list_item):
i = 0
for dict in list:
if list_item in dict:
return i
i += 1
return -1  
def unflatten_dic(dic):
"""
Unflattens a CSV list into a set of nested dictionaries
"""
ini = {}
for k,v in list(dic.items()):
node = ini
list_bool = False
*parents, key = k.split('.')
for parent in parents:
if parent[0] == '@':
list_bool = True
previous_node_list = False
if list_bool:
for parent in parents:
print(parent)
if parent[0] == '@':
node[parent[1:]] = node = node.get(parent[1:], [])
ends_with_dict = False
previous_node_list = True
else:
print("else")
if previous_node_list:
print("prev list")
i = check_in_list(node, parent)
if i >= 0:
node[i] = node = list_get(node, parent)
else:
node.append({parent : {}})
previous_node_list = False
ends_with_dict = True
else:
print("not prev list")
node[parent] = node = node.get(parent, {})
previous_node_list = False
if ends_with_dict:
node[key] = v
else:
node.append({key : v})
else:
for parent in parents:
node[parent] = node = node.get(parent, {})
node[key] = v
#print(node)
return ini

任何帮助,即使是很小的帮助,我都将非常感激。

使用递归和collections.defaultdict对父条目进行分组是最简单的(每个条目在csv数据中由.分隔):

from collections import defaultdict
def to_dict(vals, is_list = 0):
def form_child(a, b):
return b[0][0] if len(b[0]) == 1 else to_dict(b, a[0] == '@')
d = defaultdict(list)
for a, *b in vals:
d[a].append(b)
if not is_list:
return {a[a[0] == '@':]:form_child(a, b) for a, b in d.items()} 
return [{a[a[0] == '@':]:form_child(a, b)} for a, b in d.items()]

import csv, json
with open('filename.csv') as f:
data = list(csv.reader(f))
r = [a.split('.')+[b] for i in range(0, len(data), 2) for a, b in zip(data[i], data[i+1])]
print(json.dumps(to_dict(r), indent=4))

输出:

{
"x": {
"a": "ab",
"b": {
"z": "cd",
"y": "ef"
},
"c": [
{
"z": {
"nest1": "gh",
"nest2": "ij"
}
},
{
"yy": "kl"
}
],
"d": "mn",
"e": {
"z": "op",
"y": "qr"
}
}
}

我设法让它在什么似乎是所有的情况下工作。下面是我为unflat_dic()函数编写的代码。

def unflatten_dic(dic):
"""
Unflattens a CSV list into a set of nested dictionaries
"""
ini = {}
for k,v in list(dic.items()):
node = ini
list_bool = False
*parents, key = k.split('.')
# print("parents")
# print(parents)
for parent in parents:
if parent[0] == '@':
list_bool = True
if list_bool:
for parent in parents:
if parent[0] == '@':
node[parent[1:]] = node = node.get(parent[1:], [])
elif parent.isnumeric():
# print("numeric parent")
# print("length of node")
# print(len(node))
if len(node) > int(parent):
# print("node length good")
node = node[int(parent)]
else:
node.append({})
node = node[int(parent)]
else: 
node[parent] = node = node.get(parent, {})
try:
node.append({key : v})
except AttributeError:
node[key] = v 
else:
for parent in parents:
node[parent] = node = node.get(parent, {})
node[key] = v
return ini

到目前为止我还没有遇到问题,这是基于以下CSV规则的:

@在任何名称之前导致该项为列表

如果CSV中列表后面紧接的部分是一个数字,那么将在列表中创建多个字典。下面是一个例子

x.a, x.b.z, x.b.y, x.@c.0.zz, x.@c.1.zz, x.@c.2.zz, x.d, x.e.z, x.e.y, x.@c.1.yy.l, x.@c.1.yy.@m.q, x.@c.1.yy.@m.r
ab, cd, ef, gh, , kl, mn, op, qr, st, uv, wx
12, 34, 56, 78, 90, 09, , 65, 43, 21, , 92

这将在格式化

后产生以下JSON
"reports": [
{
"x": {
"a": "ab",
"b": {
"z": "cd",
"y": "ef"
},
"c": [
{
"zz": "gh"
},
{
"yy": {
"l": "st",
"m": [
{
"q": "uv"
},
{
"r": "wx"
}
]
}
},
{
"zz": "kl"
}
],
"d": "mn",
"e": {
"z": "op",
"y": "qr"
}
}
},
{
"x": {
"a": "12",
"b": {
"z": "34",
"y": "56"
},
"c": [
{
"zz": "78"
},
{
"zz": "90",
"yy": {
"l": "21",
"m": [
{
"r": "92"
}
]
}
},
{
"zz": "09"
}
],
"e": {
"z": "65",
"y": "43"
}
}
}
]

最新更新