正在分析带有嵌套字符串字典的CSV数据文件



我正试图使用通过Telethon在几周内收集的大量(20GB,约1000个文件(Telegram csv数据。我之前收集了其他数据,但不幸的是,我完全搞砸了,使用了错误的脚本,并让我的收集脚本运行,而没有尝试解析新的输出,直到它完成。这几天我一直在想这个。

我需要修复数据,以便将其读取到DataFrame中。

CSV的数据结构:

  • 索引:integer
  • parent_msg_id:整数
  • message_dict:字符串词典

csv文件的标题和第一行示例。

,parent_msg_id,message_dict
0,2103,"{...}"

message_dict给了我真正的问题。由于一些问题,我可以很容易地将其转换为dictionary(使用JSON.loads或ast.literal_eval(,这不是有效的JSON。

  • 使用"代替";('message'项通常将两者都作为标点符号,因此简单的str.replacement((不能用于整个字符串(
  • datetime未转换为文本格式,而是另存为datetime对象(有时出现在"date"one_answers"edit_date"值中(
  • 日期时间格式不一致
  • "media"值可以很长并且包含换行代码

下面的示例:

"{'_': 'Message', 
'id': 29903, 
'peer_id': {'_': 'PeerChannel', 'channel_id': 1333360824}, 
'date': datetime.datetime(2021, 12, 15, 2, 31, 58, tzinfo=datetime.timezone.utc), 
'message': 'Let's all go here: https://twitter.com/i/status/1470455120513142792', 
'out': False, 
'mentioned': False, 
'media_unread': False, 
'silent': False, 
'post': False, 
'from_scheduled': False, 
'legacy': False, 
'edit_hide': False, 
'pinned': False, 
'from_id': {'_': 'PeerUser', 'user_id': 331122594}, 
'fwd_from': None, 
'via_bot_id': None, 
'reply_to': {'_': 'MessageReplyHeader', 'reply_to_msg_id': 29738, 'reply_to_peer_id': None, 'reply_to_top_id': None}, 
'media': {'_': 'MessageMediaWebPage', 'webpage': {'_': 'WebPage', 'id': 8720606894401618080, 'url': 'https://twitter.com/status/1470455120513142792', 'display_url': 'twitter.com/status/1470455120513142792', 'hash': 0, 'type': 'photo', 'site_name': 'Twitter', 'title': 'title', 'description': 'wordswordwords', 'photo': {'_': 'Photo', 'id': 5890060159542209872, 'access_hash': 6113394333987761493, 'file_reference': b'x00axedxd6|x0ex83xbe(xc0xb9^ux8ax19xb9xe6xe5xbfxd5v', 'date': datetime.datetime(2021, 12, 13, 18, 12, 41, tzinfo=datetime.timezone.utc), 'sizes': [{'_': 'PhotoStrippedSize', 'type': 'i', 'bytes': b'x01x17(gx98x80px94xe8xa4x05rxea?x01Txeex18xa1n28xcd6)x88x95x0bx1fx978#xdax804%xf2xc8x04xc7xd4qLx08x87<bxadjx11x84x81nx91x81xebxdexa9x15!2dx07xb6x07nxb4xefxa0%xaexa4x8c""'x0bx19xa2xaaxa4xa7pxe4dQTx996x18Fxf3x9exb9xe2x9cxa8xa3xf8:xf7xa2x8ax83Axd2xcb+x95xcbx16xdbxd0x13Qx05|x92x7fx9dx14P!6xb6xecx9ex87xbdx14Q@x1f'}, {'_': 'PhotoSize', 'type': 'm', 'w': 320, 'h': 180, 'size': 13886}, {'_': 'PhotoSize', 'type': 'x', 'w': 800, 'h': 450, 'size': 48906}, {'_': 'PhotoSizeProgressive', 'type': 'y', 'w': 1200, 'h': 675, 'sizes': [10804, 24072, 30765, 43577, 73327]}], 'dc_id': 4, 'has_stickers': False, 'video_sizes': []}, 'embed_url': None, 'embed_type': None, 'embed_width': None, 'embed_height': None, 'duration': None, 'author': '@who', 'document': None, 'cached_page': None, 'attributes': []}}, 
'reply_markup': None, 
'entities': [{'_': 'MessageEntityUrl', 'offset': 0, 'length': 48}], 
'views': None, 
'forwards': None, 
'replies': None, 
'edit_date': None, 
'post_author': None, 
'grouped_id': None, 
'restriction_reason': [], 
'ttl_period': None}"

我尝试过的:

我创造了一个非常。。。通过扔厨房水槽来解决这个问题的黑客式的创造性尝试。使用regex来分离消息文本,将另一个"替换为";,regex再次识别日期时间文本,将日期时间对象文本转换为等格式字符串,并删除一些麻烦的元数据。正则表达式代码非常慢,但它可以工作。我真的很感谢您的反馈。按照这个速度,重新开始可能会更快。

rmsg = re.compile("'message': (.*?), 'out': ")
rdate = re.compile('"date": (.*?), "message": ')
rmedia = re.compile('"media": (.*?), "reply_markup": ')
reditdate = re.compile('"edit_date": (.*?), "post_author": ')
for filename in workingfiles:
df = pd.read_csv(filename, index_col=0)
if len(df) == 0:
continue
new_df = pd.DataFrame()
for row in df.iterrows():
#the actual dictionary that needs to be fixed
messagedict = row[1]['message_dict']

#return the message text and save it
msg = rmsg.search(messagedict)
if msg:
messagetxt = msg.group(1)
#split dictionary before and after message text
splitmsg = messagedict.split(messagetxt,1)

#replace ' with "
part1 = splitmsg[0].replace("'", '"')
part2 = splitmsg[1].replace("'", '"')
#combine
correctedmsg = part1 + messagetxt + part2

#pull datetime string
mdate = rdate.search(correctedmsg)
if mdate:
datetime_string = mdate.group(1)

#convert datetime to isoformat
datestrformat = 'datetime.datetime(%Y, %m, %d, %H, %M, %S, tzinfo=datetime.timezone.utc)'
datestrformat2 = 'datetime.datetime(%Y, %m, %d, %H, %M, tzinfo=datetime.timezone.utc)'
try:
datetext = str(datetime.strptime(datetime_string,datestrformat).isoformat())
#print(datetext)
except Exception as e: 
#print (e)
datetext = str(datetime.strptime(datetime_string,datestrformat2).isoformat())


mmedia = rmedia.search(correctedmsg)
if mmedia:
media = mmedia.group(1)

#replace media text with 'yes' to indicate it was a media message
if media != 'None':
part1 = correctedmsg.split(media,1)[0]
part2 = correctedmsg.split(media,1)[1]
correctedmsg = part1 + '"' + 'Yes' + '"' + part2

newline = correctedmsg.split(datetime_string)[0] + '"' + datetext + '"' + correctedmsg.split(datetime_string)[1]
meditdate = reditdate.search(correctedmsg)
if meditdate:
editdate = meditdate.group(1)
replacedate = 'None'
if editdate != 'None':
part1 = newline.split(editdate,1)[0]
part2 = newline.split(editdate,1)[1]
newline = part1 + '"' + 'Yes' + '"' + part2

linedict = ast.literal_eval(newline)
new_df = new_df.append(linedict, ignore_index=True)

df = pd.concat([df['parent_msg_id'],new_df], axis=1)

让我们编写一个解析器!

import datetime  # seems unused, but actually required to `eval` the datetimes
import json
import string
from typing import Tuple, Union, Dict, Any, List

def find_next_single_quote(text: str, start_pos: int, allow_unescaped=False) -> Tuple[str, int]:
assert text[start_pos] == "'"
# find the next unescaped single-quote
current_pos = start_pos + 1
while True:
end_pos = text.find("'", current_pos)
if end_pos == -1:
raise ValueError("could not find closing quote")
# it may be an escaped quote
count_of_backslashes_before = len(tuple(c for c in text[:end_pos - 1:-1] if c == "\"))
if count_of_backslashes_before % 2 == 0:
# there is no backslash, or only backslashes themselves escaped (pairs)
# but it can be an unescaped quote, so we try to detect for that
if allow_unescaped and end_pos + 1 <= len(text) and text[end_pos + 1] not in ",}]":  # end of token
current_pos = end_pos + 1
continue
else:
return text[start_pos + 1:end_pos], end_pos + 1
else:
# false positive
current_pos = end_pos + 1
continue

def get_json_key(text: str, start_pos: int) -> Tuple[str, int]:
return find_next_single_quote(text, start_pos)

def get_json_val(text: str, start_pos: int) -> Tuple[Union[str, float, Dict[str, Any], bool, None, bytes, List[Any]], int]:
if text[start_pos] == "'":  # it's a string
return find_next_single_quote(text, start_pos, allow_unescaped=True)
elif text[start_pos] in string.digits:
current_pos = start_pos + 1
while text[current_pos] in string.digits:
current_pos += 1
return int(text[start_pos:current_pos]), current_pos
elif text[start_pos] == "{":
return read_dict(text, start_pos)
elif text[start_pos:].startswith("datetime"):
end_pos = text.find(")", start_pos + 1)
result_datetime = eval(text[start_pos:end_pos + 1])  # quick but dirty
return result_datetime, end_pos + 1
elif text[start_pos:].startswith("False"):
return False, start_pos + 5
elif text[start_pos:].startswith("True"):
return True, start_pos + 4
elif text[start_pos:].startswith("None"):
return None, start_pos + 4
elif text[start_pos] == 'b':
return find_next_single_quote(text, start_pos + 1, allow_unescaped=True)
elif text[start_pos] == "[":
return read_list(text, start_pos)

def seek_next_item(text: str, start_pos: int) -> int:
assert text[start_pos] == ","
assert text[start_pos + 1] == " "
return start_pos + 2

def read_list(text: str, start_pos: int=0) -> Tuple[List[Any], int]:
assert text[start_pos] == "["
current_pos = start_pos + 1
result = []
first = True
while text[current_pos] != "]":
if first:
first = False
else:
current_pos = seek_next_item(text, current_pos)
item, current_pos = get_json_val(text, current_pos)
result.append(item)
return result, current_pos + 1

def seek_next_key(text: str, start_pos: int) -> int:
assert text[start_pos] == ","
assert text[start_pos + 1] == " "
if text[start_pos + 2] == "n":  # top-level dict
assert text[start_pos + 3] == "'"
return start_pos + 3
else:
assert text[start_pos + 2] == "'"
return start_pos + 2

def read_dict(text: str, start_pos: int=0) -> Tuple[Dict[str, Any], int]:
assert text[start_pos] == "{"
current_pos = start_pos + 1
result = {}
first = True
while text[current_pos] != "}":
if first:
first = False
else:
current_pos = seek_next_key(text, current_pos)
key, current_pos = get_json_key(text, current_pos)
# print("key", repr(key), current_pos)
assert text[current_pos] == ":"
assert text[current_pos + 1] == " "
current_pos += 2
# val, current_pos = get_json_val(text, current_pos)
print("val", repr(val), current_pos)
result[key] = val
return result, current_pos + 1

full_result, _ = read_dict("""{'_': 'Message', 
'id': 29903, 
'peer_id': {'_': 'PeerChannel', 'channel_id': 1333360824}, 
'date': datetime.datetime(2021, 12, 15, 2, 31, 58, tzinfo=datetime.timezone.utc), 
'message': 'Let's all go here: https://twitter.com/i/status/1470455120513142792', 
'out': False, 
'mentioned': False, 
'media_unread': False, 
'silent': False, 
'post': False, 
'from_scheduled': False, 
'legacy': False, 
'edit_hide': False, 
'pinned': False, 
'from_id': {'_': 'PeerUser', 'user_id': 331122594}, 
'fwd_from': None, 
'via_bot_id': None, 
'reply_to': {'_': 'MessageReplyHeader', 'reply_to_msg_id': 29738, 'reply_to_peer_id': None, 'reply_to_top_id': None}, 
'media': {'_': 'MessageMediaWebPage', 'webpage': {'_': 'WebPage', 'id': 8720606894401618080, 'url': 'https://twitter.com/status/1470455120513142792', 'display_url': 'twitter.com/status/1470455120513142792', 'hash': 0, 'type': 'photo', 'site_name': 'Twitter', 'title': 'title', 'description': 'wordswordwords', 'photo': {'_': 'Photo', 'id': 5890060159542209872, 'access_hash': 6113394333987761493, 'file_reference': b'x00axedxd6|x0ex83xbe(xc0xb9^ux8ax19xb9xe6xe5xbfxd5v', 'date': datetime.datetime(2021, 12, 13, 18, 12, 41, tzinfo=datetime.timezone.utc), 'sizes': [{'_': 'PhotoStrippedSize', 'type': 'i', 'bytes': b'x01x17(gx98x80px94xe8xa4x05rxea?x01Txeex18xa1n28xcd6)x88x95x0bx1fx978#xdax804%xf2xc8x04xc7xd4qLx08x87<bxadjx11x84x81nx91x81xebxdexa9x15!2dx07xb6x07nxb4xefxa0%xaexa4x8c""'x0bx19xa2xaaxa4xa7pxe4dQTx996x18Fxf3x9exb9xe2x9cxa8xa3xf8:xf7xa2x8ax83Axd2xcb+x95xcbx16xdbxd0x13Qx05|x92x7fx9dx14P!6xb6xecx9ex87xbdx14Q@x1f'}, {'_': 'PhotoSize', 'type': 'm', 'w': 320, 'h': 180, 'size': 13886}, {'_': 'PhotoSize', 'type': 'x', 'w': 800, 'h': 450, 'size': 48906}, {'_': 'PhotoSizeProgressive', 'type': 'y', 'w': 1200, 'h': 675, 'sizes': [10804, 24072, 30765, 43577, 73327]}], 'dc_id': 4, 'has_stickers': False, 'video_sizes': []}, 'embed_url': None, 'embed_type': None, 'embed_width': None, 'embed_height': None, 'duration': None, 'author': '@who', 'document': None, 'cached_page': None, 'attributes': []}}, 
'reply_markup': None, 
'entities': [{'_': 'MessageEntityUrl', 'offset': 0, 'length': 48}], 
'views': None, 
'forwards': None, 
'replies': None, 
'edit_date': None, 
'post_author': None, 
'grouped_id': None, 
'restriction_reason': [], 
'ttl_period': None}""")
print(
json.dumps({
"message": full_result["message"],
"date": full_result["date"],
"media": full_result["media"],
"edit_date": full_result["edit_date"]
}, indent=4, sort_keys=True, default=str)
)
{
"date": "2021-12-15 02:31:58+00:00",
"edit_date": null,
"media": {
"_": "MessageMediaWebPage",
...
},
"message": "Let's all go here: https://twitter.com/i/status/1470455120513142792"
}

最新更新