我在传递参数"setup_file"时遇到了麻烦我的BeamRunPythonPipelineOperator。这是Composer日志中的回溯。
[2022-11-16, 05:03:19 UTC] {beam.py:127} WARNING - error: [Errno 2] No such file or directory: 'csv_converter-0.0.1/csv_converter.egg-info/PKG-INFO'
[2022-11-16, 05:03:20 UTC] {beam.py:127} WARNING - Traceback (most recent call last):
[2022-11-16, 05:03:20 UTC] {beam.py:127} WARNING - File "/opt/python3.8/lib/python3.8/site-packages/apache_beam/utils/processes.py", line 89, in check_output
[2022-11-16, 05:03:20 UTC] {beam.py:127} WARNING - out = subprocess.check_output(*args, **kwargs)
[2022-11-16, 05:03:20 UTC] {beam.py:127} WARNING - File "/opt/python3.8/lib/python3.8/subprocess.py", line 415, in check_output
[2022-11-16, 05:03:20 UTC] {beam.py:127} WARNING - return run(*popenargs, stdout=PIPE, timeout=timeout, check=True,
[2022-11-16, 05:03:20 UTC] {beam.py:127} WARNING - File "/opt/python3.8/lib/python3.8/subprocess.py", line 516, in run
[2022-11-16, 05:03:20 UTC] {beam.py:127} WARNING - raise CalledProcessError(retcode, process.args,
[2022-11-16, 05:03:20 UTC] {beam.py:127} WARNING - subprocess.CalledProcessError: Command '['/usr/bin/python3', 'setup.py', 'sdist', '--dist-dir', '/tmp/tmpifl6ty8k']' returned non-zero exit status 1.
我不知道为什么这个[Errno 2]没有这样的文件或目录。有些dag运行正常,而有些则报告此错误。有时我得到不同的错误,如另一个文件从setup.py找不到或[Errno 5]输入/输出错误
这是我的操作符:
BeamRunPythonPipelineOperator(
task_id='xxxx',
runner="DataflowRunner",
py_file=f'/home/airflow/gcs/data/csv_converter/main.py',
pipeline_options={
'project_id': project_id,
'input_path': input_path,
'output_path': output_path,
'schema_path': schema_path,
'service_account': service_account,
'no_use_public_ips': True,
'subnetwork': subnetwork,
'staging_location': staging_location,
'temp_location': temp_location,
"setup_file": f'/home/airflow/gcs/data/csv_converter/setup.py',
"machine_type": "n1-standard-4",
"num_workers": 5,
"max_num_workers": 10,
},
py_options=[],
py_interpreter='python3',
py_system_site_packages=False,
dataflow_config=DataflowConfiguration(
job_name='{{task.task_id}}',
location=gce_region,
wait_until_finished=False,
gcp_conn_id="dataflow_conn"
),
)
这个错误是非常令人沮丧的,因为我不知道如何修复它,也没有发现任何人遇到同样的问题。
一些上下文:我们的过程包括当. csv降落在桶中时触发dag。起初,我认为这是调度程序和并发性的问题,因为我们有一些僵尸任务。我注意到,使用2个调度程序和2个vcpu时,我们注意到CPU使用率约为80%(始终停留在3/4个vcpu上,即使当多个。csv着陆时dag以突发方式触发)。我尝试将调度器增加到4和4个vcpu,但问题仍然存在。我希望这个过程能正确地安装我的包。
- 作曲器版本:2.0.31
- 气流版本:2.3.3
- apache-airflow-providers-google版本:8.1.0
- apache-beam版本:2.41.0
我以前有过这个问题,我想如果你在Composer
DAG
文件夹的根目录中使用setup.py
,它将解决你的问题。
我还建议您将Beam
作业文件夹部署到dags
而不是data
。
data
更多用于Airflow
变量。
的例子:
# Get DAG folder via an env var
dag_folder = os.getenv("DAGS_FOLDER")
BeamRunPythonPipelineOperator(
task_id='xxxx',
runner="DataflowRunner",
py_file=f'/home/airflow/gcs/dags/csv_converter/main.py',
pipeline_options={
'project_id': project_id,
'input_path': input_path,
'output_path': output_path,
'schema_path': schema_path,
'service_account': service_account,
'no_use_public_ips': True,
'subnetwork': subnetwork,
'staging_location': staging_location,
'temp_location': temp_location,
'setup_file': f"{dag_folder}/setup.py",
"machine_type": "n1-standard-4",
"num_workers": 5,
"max_num_workers": 10,
},
py_options=[],
py_interpreter='python3',
py_system_site_packages=False,
dataflow_config=DataflowConfiguration(
job_name='{{task.task_id}}',
location=gce_region,
wait_until_finished=False,
gcp_conn_id="dataflow_conn"
),
)
说明:
setup.py
在DAG
文件夹的根目录下使用:{composer_bucket}/dags/setup.py
dag_folder
是从Composer
env变量中检索的
'setup_file': f"{dag_folder}/setup.py"
在Beam
操作符中设置为选项,方法如下: