将非结构化数据转换为Python字典



我正在尝试将非结构化数据转换为python字典。数据是这样的:

main sub_main sub_main_1
AAA A-ABC ABC
AAA A-DEF A-DEF-GHI GHI
main sub_main sub_main_2
BBB B-ABC ABC
BBB B-DEF DEF
BBB B-X B-Y B-Z ""
main sub_main sub_main_3
CCC C-ABC  ABC
CCC C-X C-Y C-Z ""
CCC C-PQR C-STU 2
C-LMN C-OPQ C-RST ""
CCC C-DEF C-DEF-GHI ""
CCC C-DEF C-DEF-JKL C-MNO 1
C-XYZ ""
main sub_main sub_main_4
DDD D-ABC  DEF
DDD D-PQR  STU
main sub_main sub_main_5
EEE E-ABC DEF
EEE E-PQR STU
main sub_main sub_main_6
FFF F-ABC  F-DEF
FFF F-PQR  F-STU

现在,这里有一些条件可以将这些数据转换为嵌套的python字典。

  1. 每行开头的空格定义字典节点级别
  2. 每行可以有多个键。例如,CCC C-X C-Y C-Z 1这应该有四个嵌套键,C-Z将有1作为值(对于子节点,对于父节点,请检查下一点(。像这样:
    'CCC': {'C-X': {'C-Y: 'C-Z': 1}}
    
  3. 如果下一行的开头有更多的空间,则当前行是父节点,下一行将是子节点。在这种情况下,当前行的最后一项应该合并为一个单独的键,键之间用空格分隔。像这样:
    main sub_main sub_main_2
    BBB B-ABC ABC
    
    变为:
    'main': {'sub_main sub_main_2': {'BBB': {'B-ABC': 'ABC'}}}
    

现在,这是预期的输出:

{'main': {'sub_main sub_main_1': {'AAA': {'A-ABC': 'ABC',
'A-DEF': {'A-DEF-GHI': 'GHI'}}},
'sub_main sub_main_2': {'BBB': {'B-ABC': 'ABC',
'B-DEF': 'DEF',
'B-X': {'B-Y': {'B-Z': ''}}}},
'sub_main sub_main_3': {'CCC': {'C-ABC': 'ABC',
'C-DEF': {'C-DEF-JKL': {'C-MNO 1': {'C-XYZ': ''}},
'C-DEF-GHI': ''},
'C-PQR': {'C-STU 2': {'C-LMN': {'C-OPQ': {'C-RST': ''}}}},
'C-X': {'C-Y': {'C-Z': ''}}}},
'sub_main sub_main_4': {'DDD': {'D-ABC': 'DEF',
'D-PQR': 'STU'}},
'sub_main sub_main_5': {'EEE': {'E-ABC': 'DEF',
'E-PQR': 'STU'}},
'sub_main sub_main_6': {'FFF': {'F-ABC': 'F-DEF',
'F-PQR': 'F-STU'}}}}

这是我正在使用的代码:

def set_data(dic, key_list, key_name, value):
"""
Set the value of key up to n depth
:param dic: Output dictionary
:param key_list: List of previous keys
:param key_name: key name
:param value: Value
:return:
"""
for key in key_list:
# Get the value as per key, if key is missing then set with blank dictionary
dic = dic.setdefault(key, {})
# Set the value of the key_name
dic[key_name] = value

def get_data(dic, key_list):
"""
Get the value of key up to n depth
:param dic: Output dictionary
:param key_list: List of previous keys
:param key_name: key name
:return:
"""
for key in key_list:
# Get the value as per key, if key is missing then set with blank dictionary
dic = dic.setdefault(key, {})
return dic

def get_space_counter(input_list):
"""
Get current space counter
:param input_list:
:return:
"""
found_space = True
space_counter = 0
for j in input_list:
if found_space and j == '':
space_counter += 1
else:
break
return space_counter

def set_val(temp, output, keys):
"""
Set key, value pair of data upto n-2 keys in temp list
:param temp: List of data
:param output: Output dictionary
:param keys: List of keys
:return:
"""
set_counter = 0
for set_counter, i in enumerate(temp[:-2], start=1):
if not get_dict_data(output, keys):
set_dict_data(output, keys, i, {})
keys.append(i)
return set_counter

def custom_parser(input):
"""
Parse unstructured data into a python dictionary
:param input: Input data
:return: Python dictionary
"""
# Initialize the variables
output = {}
counter = 0
keys = []
key_line_counter = 0
# Iterate through the input list data
for i, input_str in enumerate(input):
# Convert string into list based on empty space
split_list = input_str.strip('n').split(' ')
# Get the initial space counter
current_space_counter = get_space_counter(split_list)
# Remove un-necessary space from the list
new_temp = list(filter(lambda x: x != '', split_list[counter:]))
try:
# Try to find the initial space counter of the next string input
next_split_list = input[i + 1].strip('n').split(' ')
next_space_counter = get_space_counter(next_split_list)
except IndexError:
next_space_counter = current_space_counter
# If the current input space counter is less than the next input space counter,
# that means the current input is the parent node and next input is the child node
if current_space_counter < next_space_counter:
# If Number of keys in each line is not equal to the current space counter
# and the number of keys in each line is greater than 0 then pop the key from keys
if key_line_counter != current_space_counter and key_line_counter > 0:
for _ in range(key_line_counter + 1):
keys.pop()
# Get the number of keys in each line
set_counter = set_val(new_temp, output, keys)
key_line_counter = set_counter
# Generate key name, if the next line is the child node then in the current line,
# last two items merged into one as a key with space as a separator
key_name = f'{split_list[-2]} {split_list[-1]}'
# Slice the keys
keys = keys[:current_space_counter + set_counter + 1]
# Set the key, value pair in output dictionary
set_dict_data(output, keys, key_name, {})
# Append the key_name into the keys list
keys.append(key_name)
else:
# Get the number of keys in each line
set_counter = set_val(new_temp, output, keys)
# Set the key, value pair in output dictionary
set_dict_data(output, keys[:current_space_counter + set_counter + key_line_counter + 1], new_temp[-2],
new_temp[-1].replace('"', ''))
# As per the set_counter, pop the key from the keys list
for _ in range(set_counter):
keys.pop()
return output

if __name__ == '__main__':
print(custom_parser(input_data))     

这就是我得到的输出:

{'main': {'main': {'sub_main sub_main_5': {'EEE': {'E-ABC': 'DEF',
'E-PQR': 'STU'}},
'sub_main sub_main_6': {'FFF': {'F-ABC': 'F-DEF',
'F-PQR': 'F-STU'}}},
'sub_main sub_main_1': {'AAA': {'A-ABC': 'ABC',
'A-DEF': {'A-DEF-GHI': 'GHI'}}},
'sub_main sub_main_2': {'BBB': {'B-ABC': 'ABC',
'B-DEF': 'DEF',
'B-X': {'B-Y': {'B-Z': ''}}}},
'sub_main sub_main_3': {'CCC': {'C-ABC': 'ABC',
'C-DEF': {'C-DEF-JKL': {'C-MNO 1': {'C-XYZ': ''}}},
'C-PQR': {'C-STU 2': {'C-LMN': {'C-OPQ': {'C-RST': ''}},
'CCC': {'C-DEF': {},
  'C-DEF-GHI': ''}}},
'C-X': {'C-Y': {'C-Z': ''}}},
'sub_main sub_main_4': {'DDD': {'D-ABC': 'DEF',
'D-PQR': 'STU'}}}}}

因此,如果您比较预期输出和实际输出(两者都在上面提供(,您将了解我在问题中面临的问题,而不是明确提及。所以,请指导我如何解决这些问题。非常感谢。

嗯,这比预期的要复杂一些,但这个解决方案可以满足您的需求,尽管它与您最初使用的有点不同:

from typing import Any, List, TextIO, Optional, Tuple
from io import StringIO
sample = StringIO("""main sub_main sub_main_1
AAA A-ABC ABC
AAA A-DEF A-DEF-GHI GHI
main sub_main sub_main_2
BBB B-ABC ABC
BBB B-DEF DEF
BBB B-X B-Y B-Z ""
main sub_main sub_main_3
CCC C-ABC  ABC
CCC C-X C-Y C-Z ""
CCC C-PQR C-STU 2
C-LMN C-OPQ C-RST ""
CCC C-DEF C-DEF-GHI ""
CCC C-DEF C-DEF-JKL C-MNO 1
C-XYZ ""
main sub_main sub_main_4
DDD D-ABC  DEF
DDD D-PQR  STU
main sub_main sub_main_5
EEE E-ABC DEF
EEE E-PQR STU
main sub_main sub_main_6
FFF F-ABC  F-DEF
FFF F-PQR  F-STU""")

def _dig(d: dict, keys: List[str], value: Any):
"""
returns a copy of d, recursively updated with value using nested list of string keys
"""
return d | {
keys[0]: (
_dig({}, keys[1:], value) if keys[0] not in d else _dig(d[keys[0]], keys[1:], value)
) if len(keys) > 1 else (value if value != '""' else '')}

def _data_to_dict(fp: TextIO, next_line: Optional[Tuple[int, str]], process_line: Optional[Tuple[int, str]], level: int):
result = {}
while True:
# if there's no line to process, process next_line and load a new next_line
if process_line is None:
process_line = next_line
try:
line = next(fp)
next_line = len(line) - len(line.lstrip()), [key for key in line.strip().split() if key]
except StopIteration:
# if no next_line could be read, done if process_line is None as well
if process_line is None:
return next_line, result
# otherwise, continue with next_line = None
next_line = None
else:
# if the line to process is at the same or deeper level as the next line
if next_line is None or process_line[0] >= next_line[0]:
result = _dig(result, process_line[1][:-1], process_line[1][-1])
if next_line is None or process_line[0] > next_line[0]:
return next_line, result
else:  # prev_line[0] < line[0]
next_line, sub = _data_to_dict(fp, next_line, None, level + 1)
result = _dig(result, process_line[1][:-2] + [f'{process_line[1][-2]} {process_line[1][-1]}'], sub)
if next_line is not None and next_line[0] < level:
return next_line, result
process_line = None

def data_to_dict(fp: TextIO):
__, result = _data_to_dict(fp, None, None, 0)
return result

# operating on StringIO here, would work with open text file as well
print(data_to_dict(sample))

它不太适合打印字典,但你会发现它符合你需要的结构。

在以前版本的Python中,取代_dig,在3.9.0:中添加了|运算符

def _dig(d: dict, keys: List[str], value: Any):
"""
returns a copy of d, recursively updated with value using nested list of string keys
"""
return {**d, **{
keys[0]: (
_dig({}, keys[1:], value) if keys[0] not in d else _dig(d[keys[0]], keys[1:], value)
) if len(keys) > 1 else (value if value != '""' else '')}}

我在3.6上用这个更新的_dig测试了相同的代码,这很有效。如果您使用的是更旧版本的Python,我强烈建议您进行更新(或者在您的问题中非常清楚您正在使用非常过时的Python版本(。

最新更新