我是python新手。所以,如果我没有以python的方式提问,请原谅我。
我的要求如下:
-
我需要写python代码来实现这个要求。
-
将读取60 json文件作为输入。每个文件大约150gb。
-
所有60个json文件的示例结构如下所示。请注意,每个文件将只有一个json对象。每个文件的巨大大小是因为数组元素的数量和大小数组包含在一个巨大的json对象中
{"string_1"abc"string_1"abc"string_1"abc"string_1"abc"string_1"abc"string_1"abc"array_element" []}
-
转换逻辑很简单。我需要合并所有的array_element从所有60个文件,并将其写入一个巨大的json文件。这几乎是150GB x60将是输出json文件的大小。
我请求您帮助的问题:
-
阅读:计划使用"ijson"模块的ijson。项(file_object,"array_element"。你能告诉我我是否。项目将"yield";(即不是将整个文件加载到内存中)从"array_element"json文件中的数组?我不认为json。加载在这里是一个选项,因为我们无法在内存中容纳如此庞大的字典。
-
写作:我计划使用ijson读取每个项目。项,并执行json。转储到"encode"然后使用file_object将其写入文件。写和不使用json。因为我不能在内存中有这么大的字典来使用json.dump。你能让我知道如果f.r ush()应用在下面的代码中显示是需要的吗?据我的理解,内部缓冲区将自动刷新自己,当它是满的,内部缓冲区的大小是恒定的,不会动态增长到一定程度,它会使内存过载?请让我知道
-
有没有更好的方法来增量读取和写入巨大的json文件上面提到的?
显示上述读写逻辑的代码片段:
for input_file in input_files:
with open("input_file.json", "r") as f:
objects = ijson.items(f, "array_element")
for item in objects:
str = json.dumps(item, indent=2)
with open("output.json", "a") as f:
f.write(str)
f.write(",n")
f.flush()
with open("output.json", "a") as f:
f.seek(0,2)
f.truncate(f.tell() - 1)
f.write("]n}")
希望我已经把我的问题问清楚了。提前感谢!!
下面的程序假设输入文件具有足够可预测的格式,可以为了性能而跳过JSON解析。
根据你的描述,我的假设是:
- 所有文件具有相同的编码
- 所有文件都有一个单一的位置,在开始的某个地方可以找到
"array_element":[
,之后是"有趣的部分"。文件以 开头 - 所有文件在末尾都有一个位置,
]}
标志着"感兴趣的部分"的结束。 - 所有"有趣的部分";可以用逗号连接,仍然是有效的JSON
当所有这些点都为真时,将预定义的头片段、相应的文件范围和页脚片段连接起来将生成一个大的、有效的JSON文件。
import re
import mmap
head_pattern = re.compile(br'"array_element"s*:s*[s*', re.S)
tail_pattern = re.compile(br's*]s*}s*$', re.S)
input_files = ['sample1.json', 'sample2.json']
with open('result.json', "wb") as result:
head_bytes = 500
tail_bytes = 50
chunk_bytes = 16 * 1024
result.write(b'{"JSON": "fragment", "array_element": [n')
for input_file in input_files:
print(input_file)
with open(input_file, "r+b") as f:
mm = mmap.mmap(f.fileno(), 0)
start = head_pattern.search(mm[:head_bytes])
end = tail_pattern.search(mm[-tail_bytes:])
if not (start and end):
print('unexpected file format')
break
start_pos = start.span()[1]
end_pos = mm.size() - end.span()[1] + end.span()[0]
if input_files.index(input_file) > 0:
result.write(b',n')
pos = start_pos
mm.seek(pos)
while True:
if pos + chunk_bytes >= end_pos:
result.write(mm.read(end_pos - pos))
break
else:
result.write(mm.read(chunk_bytes))
pos += chunk_bytes
result.write(b']n}')
如果文件格式是100%可预测的,您可以抛弃正则表达式并使用mm[:head_bytes].index(b'...')
等作为开始/结束位置算法。