Composer在使用BeamRunPythonPipelineOperator安装setup.py时抛出错误



我在传递参数"setup_file"时遇到了麻烦我的BeamRunPythonPipelineOperator。这是Composer日志中的回溯。

[2022-11-16, 05:03:19 UTC] {beam.py:127} WARNING - error: [Errno 2] No such file or directory: 'csv_converter-0.0.1/csv_converter.egg-info/PKG-INFO'
[2022-11-16, 05:03:20 UTC] {beam.py:127} WARNING - Traceback (most recent call last):
[2022-11-16, 05:03:20 UTC] {beam.py:127} WARNING -   File "/opt/python3.8/lib/python3.8/site-packages/apache_beam/utils/processes.py", line 89, in check_output
[2022-11-16, 05:03:20 UTC] {beam.py:127} WARNING -     out = subprocess.check_output(*args, **kwargs)
[2022-11-16, 05:03:20 UTC] {beam.py:127} WARNING -   File "/opt/python3.8/lib/python3.8/subprocess.py", line 415, in check_output
[2022-11-16, 05:03:20 UTC] {beam.py:127} WARNING -     return run(*popenargs, stdout=PIPE, timeout=timeout, check=True,
[2022-11-16, 05:03:20 UTC] {beam.py:127} WARNING -   File "/opt/python3.8/lib/python3.8/subprocess.py", line 516, in run
[2022-11-16, 05:03:20 UTC] {beam.py:127} WARNING -     raise CalledProcessError(retcode, process.args,
[2022-11-16, 05:03:20 UTC] {beam.py:127} WARNING - subprocess.CalledProcessError: Command '['/usr/bin/python3', 'setup.py', 'sdist', '--dist-dir', '/tmp/tmpifl6ty8k']' returned non-zero exit status 1.

我不知道为什么这个[Errno 2]没有这样的文件或目录。有些dag运行正常,而有些则报告此错误。有时我得到不同的错误,如另一个文件从setup.py找不到或[Errno 5]输入/输出错误

这是我的操作符:

BeamRunPythonPipelineOperator(
task_id='xxxx',
runner="DataflowRunner",
py_file=f'/home/airflow/gcs/data/csv_converter/main.py',
pipeline_options={
'project_id': project_id,
'input_path': input_path,
'output_path': output_path,
'schema_path': schema_path,
'service_account': service_account,     
'no_use_public_ips': True,
'subnetwork': subnetwork,      
'staging_location': staging_location,
'temp_location': temp_location,
"setup_file": f'/home/airflow/gcs/data/csv_converter/setup.py',
"machine_type": "n1-standard-4",
"num_workers": 5,
"max_num_workers": 10,
},
py_options=[],
py_interpreter='python3',
py_system_site_packages=False,
dataflow_config=DataflowConfiguration(
job_name='{{task.task_id}}',
location=gce_region,
wait_until_finished=False,
gcp_conn_id="dataflow_conn"
),
)

这个错误是非常令人沮丧的,因为我不知道如何修复它,也没有发现任何人遇到同样的问题。

一些上下文:我们的过程包括当. csv降落在桶中时触发dag。起初,我认为这是调度程序和并发性的问题,因为我们有一些僵尸任务。我注意到,使用2个调度程序和2个vcpu时,我们注意到CPU使用率约为80%(始终停留在3/4个vcpu上,即使当多个。csv着陆时dag以突发方式触发)。我尝试将调度器增加到4和4个vcpu,但问题仍然存在。我希望这个过程能正确地安装我的包。

  • 作曲器版本:2.0.31
  • 气流版本:2.3.3
  • apache-airflow-providers-google版本:8.1.0
  • apache-beam版本:2.41.0

我以前有过这个问题,我想如果你在ComposerDAG文件夹的根目录中使用setup.py,它将解决你的问题。

我还建议您将Beam作业文件夹部署到dags而不是data

data更多用于Airflow变量。

的例子:

# Get DAG folder via an env var
dag_folder = os.getenv("DAGS_FOLDER")
BeamRunPythonPipelineOperator(
task_id='xxxx',
runner="DataflowRunner",
py_file=f'/home/airflow/gcs/dags/csv_converter/main.py',
pipeline_options={
'project_id': project_id,
'input_path': input_path,
'output_path': output_path,
'schema_path': schema_path,
'service_account': service_account,     
'no_use_public_ips': True,
'subnetwork': subnetwork,      
'staging_location': staging_location,
'temp_location': temp_location,
'setup_file': f"{dag_folder}/setup.py",
"machine_type": "n1-standard-4",
"num_workers": 5,
"max_num_workers": 10,
},
py_options=[],
py_interpreter='python3',
py_system_site_packages=False,
dataflow_config=DataflowConfiguration(
job_name='{{task.task_id}}',
location=gce_region,
wait_until_finished=False,
gcp_conn_id="dataflow_conn"
),
)

说明:

  • setup.pyDAG文件夹的根目录下使用:{composer_bucket}/dags/setup.py
  • dag_folder是从Composerenv变量中检索的
  • 'setup_file': f"{dag_folder}/setup.py"Beam操作符中设置为选项,方法如下:

相关内容

  • 没有找到相关文章

最新更新