拆分大型json文件,并将每个部分分配给一个进程



我正在尝试用多个进程来精化一个大的JSON数据集~14 GiB(1200万行(,这样我可以更快。我创建了两个multiprocessing.Queue实例,in_qout_q。他们会读取data-2021-09-29.jsonl文件中的数据(这是数据集(,输出包含我感兴趣的数据的行,并将其写入另一个文件stokes_DE_file.jsonl(这是我细化数据集的部分,输出文件是数据集的细化版本(。我的机器上有16个CPU,假设我想将文件拆分为16个部分,行数将是变量lines_PER_PROCESS。如何为每个进程分配文件的一部分?以下是我迄今为止编写的代码。从python中的多处理模块开始。这是我的代码:

import multiprocessing as mp
import threading
import json
import reverse_geocoder as rg
LINES_PER_PROCESS = 12137928/(mp.cpu_count()-1)
def geocode_worker(in_q, out_q):
while True:
strike = in_q.get()
if strike is None:
out_q.put(None)
return
strike_location = (strike['lat'], strike['lon'])
if rg.search(strike_location)[0]['cc'] == 'DE':
out_q.put('{}n'.format(strike))
def file_write_worker(out_q, fileobj, worker_count):
while worker_count:
for msg in iter(out_q.get, None):

if msg is None:
worker_count -= 1
fileobj.write(msg)
def get_germany_strokes(jsonl_file):

worker_count = mp.cpu_count() - 1
in_q, out_q = mp.Queue(), mp.Queue()
processes = [mp.Process(target=geocode_worker, args=(in_q, out_q)) for _ in range(worker_count)]

for p in processes:
p.start()

with open('strokes_DE_file.json', newline='') as strokes_DE_file:
file_writer = threading.Thread(target=file_write_worker, args=(out_q, strokes_DE_file, worker_count))
file_writer.start()

with open(jsonl_file, newline='') as file:
next(file)
for line in file:
strokes = json.loads(line)['strokes']
for strike in strokes:
in_q.put(strike)

get_germany_strokes('data-2021-09-29.jsonl')

我找到的解决方案是在Linux中使用split -l命令拆分数据集,并指定每个子集的行数。我将每个子集分配给一个进程,总共15个进程大约是要创建的最佳进程数量(考虑到开销/负载平衡(。

相关内容

  • 没有找到相关文章

最新更新