我有以下文件:
binreader/
├─ packet/
│ ├─ __init__.py
│ ├─ aggregator.py
│ ├─ parser.py
│ ├─ uploader.py
├─ __init__.py
├─ __main__.py
├─ upload_concurrent.py
再现错误的代码:
/packet/__init__.py
<empty>
/packet/aggerator.py
from multiprocessing import Queue, Process
import logging
log = logging.getLogger()
class AggregatorProcess(Process):
def __init__(self, q_in: Queue, q_out: Queue):
super(AggregatorProcess, self).__init__()
self.q_in = q_in
self.q_out = q_out
def run(self):
while x := self.q_in.get():
log.debug(f"Aggregator: {x}")
self.q_out.put(x)
log.debug("Aggregator: Done")
self.q_out.put(None)
/packet/parser.py
from multiprocessing import Queue, Process
import logging
from typing import List
log = logging.getLogger()
class ParserProcess(Process):
"""Threaded version of parser class"""
def __init__(self, data: List, q_out: Queue):
super(ParserProcess, self).__init__()
self.q_out = q_out
self.data = data
def run(self):
for x in self.data:
log.debug(f"Parser: {x}")
self.q_out.put(x)
log.debug("Parser: Done")
self.q_out.put(None)
/packet/uploloader.py
from multiprocessing import Queue, Process
import logging
log = logging.getLogger()
class UploaderProcess(Process):
def __init__(self, q_in: Queue) -> None:
super(UploaderProcess, self).__init__()
self.q_in = q_in
def run(self):
while x := self.q_in.get():
log.debug(f"Uploader: {x}")
log.debug("Uploader: Done")
/__init__.py
import sys
import click
import logging
from binreader import upload_concurrent
@click.group()
def cli():
logging.basicConfig(
format="%(asctime)s [%(processName)-16s]@%(lineno)4d %(levelname)s: %(message)s",
level=logging.DEBUG,
handlers=[
logging.StreamHandler(sys.stdout),
],
)
cli.add_command(upload_concurrent.upload_data_concurrent)
cli()
/__main__.py
<empty>
/upload_concurrent.py
from multiprocessing import Queue
import click
from .packet.aggregator import AggregatorProcess
from .packet.parser import ParserProcess
from .packet.uploader import UploaderProcess
log = logging.getLogger()
@click.command(name="upload-concurrent")
def upload_data_concurrent():
parser_agg_wq = Queue()
agg_upl_wq = Queue()
parser = ParserProcess([1, 2, 3, 4, 5], parser_agg_wq)
parser.name = type(parser).__name__
aggregator = AggregatorProcess(parser_agg_wq, agg_upl_wq)
aggregator.name = type(aggregator).__name__
uploader = UploaderProcess(agg_upl_wq)
uploader.name = type(uploader).__name__
parser.start()
aggregator.start()
uploader.start()
parser.join()
aggregator.join()
uploader.join()
我有完成处理的同步代码,但速度太慢,大约为1小时/GB。大约有1.5TB的数据需要每两周处理一次。
在引入多处理时,每次调用Process.start:都会出现以下错误
RuntimeError:
An attempt has been made to start a new process before the
current process has finished its bootstrapping phase.
This probably means that you are not using fork to start your
child processes and you have forgotten to use the proper idiom
in the main module:
if __name__ == '__main__':
freeze_support()
...
The "freeze_support()" line can be omitted if the program
is not going to be frozen to produce an executable.
此程序作为模块运行:python -m binreader upload-concurrent
我读过这个问题,但我不确定在哪里添加if __name__ == '__main__':
防护。这可能不是一个可行的解决方案,因为这是使用点击模块,我不确定这对模块的启动/运行方式有什么影响。
非常感谢任何指导
if __name__ == '__main__':
条件为true。它需要进入代码的主要范围。错误信息中称之为main module
,虽然正确,但对我来说似乎有点困惑
条件将转到程序中第一个调用所在的位置。