我有一个文本文件,结构如下:
SOURCE: RCM
DESTINATIONS BEGIN
JCK SF3
DESTINATIONS END
SOURCE: TRO
DESTINATIONS BEGIN
GFN SF3
SYD SF3 DH4
DESTINATIONS END
我正在尝试创建一个嵌套字典,结果字典看起来像:
handout_routes = {
'RCM': {'JCK': ['SF3']},
'TRO': {'GFN': ['SF3'], 'SYD': ['SF3', 'DH4']}
}
现在这只是数据的一个样本,但在读取数据时,我们可以假设如下:第一行以SOURCE:开头,后跟三个字母的IATA机场代码。每个以SOURCE:开头的行后面的行是DESTINATIONS BEGIN。在DESTINATIONS BEGIN和DESTINATIONS END之间有一行或多行。在带有DESTINATIONS BEGIN的每一行之后,都有一个带有DESTINATIONS END的对应行。DESTINATIONS BEGIN和DESTINATIONS END之间的行以三个字母的IATA机场代码开头,后面是一个或多个三个字符的字母数字飞机代码。每个代码用一个空格分隔。在DESTINATIONS END之后的行将以SOURCE:开头,否则您将到达文件的末尾。
到目前为止我已经试过了
with open ("file_path", encoding='utf-8') as text_data:
answer = {}
for line in text_data:
line = line.split()
if not line: # empty line?
continue
answer[line[0]] = line[1:]
print(answer)
但是它返回的数据是这样的:
{'SOURCE:': ['WYA'], 'DESTINATIONS': ['END'], 'KZN': ['146'], 'DYU': ['320']}
我认为这是我如何组织代码来读取文件。任何帮助都将不胜感激。有可能我的代码太简单了,不需要对文件做什么。谢谢你。
我编写的程序运行得很好:
def unpack(file):
contents:dict = {}
source:str
for line in file.split('n'):
if line[:12] == 'DESTINATIONS':
pass
#these lines don't affect the program so we ignore them
elif not line:
pass
#empty line so we ignore it
elif line[:6] == 'SOURCE':
source = line.rpartition(' ')[-1]
if source not in contents:
contents[source] = {}
else:
idx, *data = line.split(' ')
contents[source][idx] = list(data)
return contents
with open('file.txt') as file:
handout_routes = unpack(file.read())
print(handout_routes)
我知道已经有一个公认的答案,但我使用了一种方法,它实际上可以帮助您找到文件中的格式错误,而不是仅仅忽略额外的位:
from tokenize import TokenInfo, tokenize, ENCODING, ENDMARKER, NEWLINE, NAME
from typing import Callable, Generator
class TripParseException(Exception):
pass
def assert_token_string(token:TokenInfo, expected_string: str):
if token.string != expected_string:
raise TripParseException("Unable to parse trip file: expected {}, found {} in line {} ({})".format(
expected_string, token.string, str(token.start[0]), token.line
))
def assert_token_type(token:TokenInfo, expected_type: int):
if token.type != expected_type:
raise TripParseException("Unable to parse trip file: expected type {}, found type {} in line {} ({})".format(
expected_type, token.type, str(token.start[0]), token.line
))
def parse_destinations(token_stream: Generator[TokenInfo, None, None])->dict:
destinations = dict()
assert_token_string(next(token_stream), "DESTINATIONS")
assert_token_string(next(token_stream), "BEGIN")
assert_token_type(next(token_stream), NEWLINE)
current_token = next(token_stream)
while(current_token.string != "DESTINATIONS"):
assert_token_type(current_token, NAME)
destination = current_token.string
plane_codes = list()
current_token = next(token_stream)
while(current_token.type != NEWLINE):
assert_token_type(current_token, NAME)
plane_codes.append(current_token.string)
current_token = next(token_stream)
destinations[destination] = plane_codes
# current token is NEWLINE, get the first token on the next line.
current_token = next(token_stream)
# Just parsed "DESTINATIONS", expecting "DESTINATIONS END"
assert_token_string(next(token_stream), "END")
assert_token_type(next(token_stream), NEWLINE)
return destinations
def parse_trip(token_stream: Generator[TokenInfo, None, None]):
current_token = next(token_stream)
if(current_token.type == ENDMARKER):
return None, None
assert_token_string(current_token, "SOURCE")
assert_token_string(next(token_stream), ":")
tok_origin = next(token_stream)
assert_token_type(tok_origin, NAME)
assert_token_type(next(token_stream), NEWLINE)
destinations = parse_destinations(token_stream)
return tok_origin.string, destinations
def parse_trips(readline: Callable[[], bytes]) -> dict:
token_gen = tokenize(readline)
assert_token_type(next(token_gen), ENCODING)
trips = dict()
while(True):
origin, destinations = parse_trip(token_gen)
if(origin is not None and destinations is not None):
trips[origin] = destinations
else:
break
return trips
那么你的实现看起来像这样:
import pprint
with open("trips.dat", "rb") as trips_file:
trips = parse_trips(trips_file.readline)
pprint.pprint(
trips
)
产生预期结果:
{'RCM': {'JCK': ['SF3']}, 'TRO': {'GFN': ['SF3'], 'SYD': ['SF3', 'DH4']}}
如果您想稍后将其他信息放入文件中,这也更加灵活。
from itertools import takewhile
import re
def destinations(lines):
if next(lines).startswith('DESTINATIONS BEGIN'):
dest = takewhile(lambda l: not l.startswith('DESTINATIONS END'), lines)
yield from map(str.split, dest)
def sources(lines):
source = re.compile('SOURCE:s*(w+)')
while m := source.match(next(lines, '')):
yield (m.group(1),
{dest: crafts for dest, *crafts in destinations(lines)})
handout_routes = {s: d for s, d in sources(open('file_path', encoding='utf-8'))}
print(handout_routes)