我发现在使用气流时,使用多处理会导致断言错误。我解决了我的错误(这次讨论和这次讨论(。但我很好奇流程在气流作业中是如何实际工作的,所以我运行了代码。
def process_function(i):
parent_process = multiprocessing.parent_process().pid
parent_process_daemon = (
multiprocessing.parent_process().daemon
if multiprocessing.parent_process() is not None
else None
)
current_process = multiprocessing.current_process().pid
is_daemon = multiprocessing.current_process().daemon
result = (
f"{i}th task : "
+ "partent_process : "
+ str(parent_process)
+ " is daemon : "
+ str(parent_process_daemon)
+ " current_process : "
+ str(current_process)
+ " is daemon : "
+ str(is_daemon)
)
time.sleep(3)
return result
def mp(run_n: int):
print(f"start checking multiprocessing pid task")
print("[1] check pid using os modlue")
print(f"parent process : {os.getppid()} current process : {os.getpid()}")
print("[2] check pid using multiprocessingmodlue")
print(
f"parent process : {multiprocessing.parent_process().pid if multiprocessing.parent_process() is not None else None} is daemon? : {multiprocessing.parent_process().daemon if multiprocessing.parent_process() is not None else None} process : {multiprocessing.current_process().pid} is daemon? : {multiprocessing.current_process().daemon}"
)
results = []
print(f"start job")
with concurrent.futures.ProcessPoolExecutor() as process_executor:
for pp_res in process_executor.map(process_function, [i for i in range(run_n)]):
results.append(pp_res)
print(f"job done")
for c in results:
print(c)
...
with models.DAG(
dag_id="daemon_test",
description="daemon_test",
schedule_interval="0 * * * *",
default_args=default_args,
catchup=False,
) as dag:
test_job = PythonOperator(
task_id="test_job ",
python_callable=mp,
op_kwargs={
"run_n": 5,
},
)
和结果
{standard_task_runner.py:52} INFO - Started process 67070 to run task
...
start checking multiprocessing pid task
[1] check pid using os modlue
parent process : 67069 current process : 67070
[2] check pid using multiprocessing modlue
parent process : None is daemon? : None process : 67070 is daemon? : True
...
0th task : partent_process : 67070 is daemon : False current_process : 67071 is daemon : True
1th task : partent_process : 67070 is daemon : False current_process : 67072 is daemon : True
2th task : partent_process : 67070 is daemon : False current_process : 67073 is daemon : True
3th task : partent_process : 67070 is daemon : False current_process : 67074 is daemon : True
4th task : partent_process : 67070 is daemon : False current_process : 67071 is daemon : True
即使pid值可以更改,我也不认为None值可以出来。有人知道原因吗
你说得对;PID(或父PID(不能是CCD_ 1。
multiprocessing.parent_process()
返回(多处理子-(内部全局multiprocessing.process._parent_process
的值。
当流程被派生为multiprocessing
子流程时,仅在子流程中设置全局only。
另一方面,os.getppid()
只是调用OS函数来获取父PID,无论是多处理父PID还是其他什么。