说到Python,我有点初学者,但我在学校的一个项目需要我在这个reddit流行数据集上执行分类算法。这些文件是巨大的.zst文件,可以在这里找到:https://files.pushshift.io/reddit/submissions/无论如何,我只是不确定如何将其提取到数据库中,因为到目前为止,我们的任务只是使用.csv数据集,我可以很容易地将其放入pandas数据帧中。我偶然发现了一个不同的帖子,我尝试使用代码:
def transform_zst_file(self,infile):
zst_num_bytes = 2**22
lines_read = 0
dctx = zstd.ZstdDecompressor()
with dctx.stream_reader(infile) as reader:
previous_line = ""
while True:
chunk = reader.read(zst_num_bytes)
if not chunk:
break
string_data = chunk.decode('utf-8')
lines = string_data.split("n")
for i, line in enumerate(lines[:-1]):
if i == 0:
line = previous_line + line
self.appendData(line, self.type)
lines_read += 1
if self.max_lines_to_read and lines_read >= self.max_lines_to_read:
return
previous_line = lines[-1]
但我不完全确定如何将其放入pandas数据帧中,或者如果文件太大,则仅将一定百分比的数据点放入数据帧中。如有任何帮助,我们将不胜感激!
以下代码只会在每次我尝试运行时使我的计算机崩溃:
import zstandard as zstd
your_filename = "..."
with open(your_filename, "rb") as f:
data = f.read()
dctx = zstd.ZstdDecompressor()
decompressed = dctx.decompress(data)
可能是由于文件的大小太大,是否可以将该文件的一部分提取到pandas数据帧中?
文件已使用Zstandard压缩(https://github.com/facebook/zstd),压缩库。
对您来说,最简单的事情可能是安装python-zstandard(https://pypi.org/project/zstandard/)使用
pip install zstandard
然后在python脚本中运行类似的程序
import zstandard as zstd
your_filename = "..."
with open(your_filename, "rb") as f:
data = f.read()
dctx = zstd.ZstdDecompressor()
decompressed = dctx.decompress(data)
现在,您可以直接使用解压缩后的数据,也可以将其写入某个文件,然后加载到panda中。祝你好运
从1.4版本开始,Pandas可以对Zstandard(.zst
(进行本地解压缩。在此之前,本机支持".gz"、".bz2"、".zip"one_answers".xz"压缩。
如果文件以.zst
后缀结尾,则默认情况下panda推断压缩并可以在文件中读取。
import pandas
df = pandas.read_csv('my_file.csv.zst')
# Being equivalent to
# df = pandas.read_csv('my_file.csv.zst', compression='zstd')
# for files ending with .zst
请参阅Pandas read_csv文档中的更多信息。
与Bimba的答案不同,当它对每一行进行操作时,它不会将所有内容都读取到内存中。如果您在压缩的新行分隔数据上操作,而该数据大于可用内存,则这一点非常有用。
import io
import zstandard as zstd
from pathlib import Path
import json
DCTX = zstd.ZstdDecompressor(max_window_size=2**31)
def read_lines_from_zst_file(zstd_file_path:Path):
with (
zstd.open(zstd_file_path, mode='rb', dctx=DCTX) as zfh,
io.TextIOWrapper(zfh) as iofh
):
for line in iofh:
yield line
if __name__ == "__main__":
file = Path('some_zstd_file.zst')
records = map(json.loads, read_lines_from_zst_file(file))
for record in records:
print(record.get('some-field'))
我偶然发现了一个类似的由zst
转储组成的Reddit数据集。为了迭代您的zst文件的内容,我使用了以下代码,您可以将其作为脚本运行:
import zstandard
import os
import json
import sys
from datetime import datetime
import logging.handlers
log = logging.getLogger("bot")
log.setLevel(logging.DEBUG)
log.addHandler(logging.StreamHandler())
def read_lines_zst(file_name):
with open(file_name, 'rb') as file_handle:
buffer = ''
reader = zstandard.ZstdDecompressor(max_window_size=2**31).stream_reader(file_handle)
while True:
chunk = reader.read(2**27).decode()
if not chunk:
break
lines = (buffer + chunk).split("n")
for line in lines[:-1]:
yield line, file_handle.tell()
buffer = lines[-1]
reader.close()
if __name__ == "__main__":
file_path = sys.argv[1]
file_size = os.stat(file_path).st_size
file_lines = 0
file_bytes_processed = 0
created = None
field = "subreddit"
value = "wallstreetbets"
bad_lines = 0
try:
for line, file_bytes_processed in read_lines_zst(file_path):
try:
obj = json.loads(line)
created = datetime.utcfromtimestamp(int(obj['created_utc']))
temp = obj[field] == value
except (KeyError, json.JSONDecodeError) as err:
bad_lines += 1
file_lines += 1
if file_lines % 100000 == 0:
log.info(f"{created.strftime('%Y-%m-%d %H:%M:%S')} : {file_lines:,} : {bad_lines:,} : {(file_bytes_processed / file_size) * 100:.0f}%")
except Exception as err:
log.info(err)
log.info(f"Complete : {file_lines:,} : {bad_lines:,}")
我使用了io模块中的TextIOWrapper。
with open(file_name, 'rb') as fh:
dctx = zstandard.ZstdDecompressor(max_window_size=2147483648)
stream_reader = dctx.stream_reader(fh)
text_stream = io.TextIOWrapper(stream_reader, encoding='utf-8')
for line in text_stream:
obj = json.loads(line)
# HANDLE OBJECT LOGIC HERE
实现这一点可能有更简单的方法,但要使用python将Reddit数据集转储中的zst转换为有效的json文件,我最终使用:
import zstandard as zstd
zst = '/path/to/file.zst'
with open(zst, "rb") as f:
data = f.read()
dctx = zstd.ZstdDecompressor()
decompressed = dctx.decompress(data, max_output_size=1000000000) # 1GB
with open("/path/to/file.json", "w+") as f:
f.write("[" + decompressed.decode("utf-8").strip().replace("n", ",") + "]" )
读取json文件:
import json
with open("/path/to/file.json") as f:
data = json.load(f)
for d in data:
print(d)
而且总是有一个bash脚本来拯救它,这似乎更容易(记住安装zstd和jq(:
set -euxo pipefail
cat "/path/to/file.zst" | zstd -d | jq --compact-output '.created_utc = (.created_utc | tonumber)' > "/path/to/file.json"