气流(作曲器)触发DAG后出现错误



我无法理解是什么问题,而我的DAG被触发创建数据程序集群。下面是日志

[2021-02-02 12:13:55,321] {taskinstance.py:671} INFO - Dependencies all met for <TaskInstance: hourly_processing.create_dataproc_cluster 2021-02-02T12:13:47.063326+00:00 [queued]>
[2021-02-02 12:13:55,322] {taskinstance.py:881} INFO -
--------------------------------------------------------------------------------
[2021-02-02 12:13:55,322] {taskinstance.py:882} INFO - Starting attempt 1 of 1
[2021-02-02 12:13:55,323] {taskinstance.py:883} INFO -
--------------------------------------------------------------------------------
[2021-02-02 12:13:55,390] {taskinstance.py:902} INFO - Executing <Task(BashOperator): create_dataproc_cluster> on 2021-02-02T12:13:47.063326+00:00
[2021-02-02 12:13:55,395] {standard_task_runner.py:54} INFO - Started process 3911 to run task
[2021-02-02 12:13:55,458] {standard_task_runner.py:77} INFO - Running: ['airflow', 'run', 'hourly_processing', 'create_dataproc_cluster', '2021-02-02T12:13:47.063326+00:00', '--job_id', '75', '--pool', 'default_pool', '--raw', '-sd', 'DAGS_FOLDER/hourlyprocess.py', '--cfg_path', '/tmp/tmpfoqft1ib']
[2021-02-02 12:13:55,460] {standard_task_runner.py:78} INFO - Job 75: Subtask create_dataproc_cluster
[2021-02-02 12:13:56,127] {logging_mixin.py:112} INFO - Running <TaskInstance: hourly_processing.create_dataproc_cluster 2021-02-02T12:13:47.063326+00:00 [running]> on host airflow-worker-5b45bf88bd-mmbvk
[2021-02-02 12:13:56,224] {bash_operator.py:114} INFO - Tmp dir root location:
/tmp@-@{"workflow": "hourly_processing", "task-id": "create_dataproc_cluster", "execution-date": "2021-02-02T12:13:47.063326+00:00"}
[2021-02-02 12:13:56,226] {bash_operator.py:137} INFO - Temporary script location: /tmp/airflowtmp2njq_lg4/create_dataproc_clusterc4xad95g@-@{"workflow": "hourly_processing", "task-id": "create_dataproc_cluster", "execution-date": "2021-02-02T12:13:47.063326+00:00"}
[2021-02-02 12:13:56,226] {bash_operator.py:147} INFO - Running command: gcloud dataproc clusters create hourly-job --bucket my-per-project-composer-stg --region europe-west1 --zone europe-west1-b --subnet projects/my-per-project/regions/europe-west1/subnetworks/rtm-test-vpc --project my-per-project --service-account 136552104597-compute@developer.gserviceaccount.com --master-machine-type n1-standard-8 --master-boot-disk-size 500 --worker-machine-type n1-standard-16 --worker-boot-disk-size 500 --num-workers 10 --image-version 1.5-debian10 --properties core:fs.gs.implicit.dir.repair.enable=false,core:fs.gs.status.parallel.enable=true --no-address@-@{"workflow": "hourly_processing", "task-id": "create_dataproc_cluster", "execution-date": "2021-02-02T12:13:47.063326+00:00"}
[2021-02-02 12:13:56,396] {bash_operator.py:154} INFO - Output:@-@{"workflow": "hourly_processing", "task-id": "create_dataproc_cluster", "execution-date": "2021-02-02T12:13:47.063326+00:00"}
[2021-02-02 12:13:58,405] {bash_operator.py:158} INFO - Waiting on operation [projects/my-per-project/regions/europe-west1/operations/6a80afd5-1a32-3bf0-8501-5b69385c9175].@-@{"workflow": "hourly_processing", "task-id": "create_dataproc_cluster", "execution-date": "2021-02-02T12:13:47.063326+00:00"}
[2021-02-02 12:13:58,412] {bash_operator.py:158} INFO - Waiting for cluster creation operation...@-@{"workflow": "hourly_processing", "task-id": "create_dataproc_cluster", "execution-date": "2021-02-02T12:13:47.063326+00:00"}
[2021-02-02 12:13:58,469] {bash_operator.py:158} INFO - WARNING: For PD-Standard without local SSDs, we strongly recommend provisioning 1TB or larger to ensure consistently high I/O performance. See https://cloud.google.com/compute/docs/disks/performance for information on disk I/O performance.@-@{"workflow": "hourly_processing", "task-id": "create_dataproc_cluster", "execution-date": "2021-02-02T12:13:47.063326+00:00"}
[2021-02-02 12:13:58,471] {bash_operator.py:158} INFO - WARNING: Subnetwork 'rtm-test-vpc' does not support Private Google Access which is required for Dataproc clusters when 'internal_ip_only' is set to 'true'. Enable Private Google Access on subnetwork 'rtm-test-vpc' or set 'internal_ip_only' to 'false'.@-@{"workflow": "hourly_processing", "task-id": "create_dataproc_cluster", "execution-date": "2021-02-02T12:13:47.063326+00:00"}
[2021-02-02 12:44:13,316] {bash_operator.py:158} INFO - ......done.@-@{"workflow": "hourly_processing", "task-id": "create_dataproc_cluster", "execution-date": "2021-02-02T12:13:47.063326+00:00"}
[2021-02-02 12:44:13,381] {bash_operator.py:158} INFO - ERROR: (gcloud.dataproc.clusters.create) Operation [projects/my-per-project/regions/europe-west1/operations/6a80afd5-1a32-3bf0-8501-5b69385c9175] failed: Multiple Errors:@-@{"workflow": "hourly_processing", "task-id": "create_dataproc_cluster", "execution-date": "2021-02-02T12:13:47.063326+00:00"}
[2021-02-02 12:44:13,382] {bash_operator.py:158} INFO -  - Timeout waiting for instance hourly-job-m to report in.@-@{"workflow": "hourly_processing", "task-id": "create_dataproc_cluster", "execution-date": "2021-02-02T12:13:47.063326+00:00"}
[2021-02-02 12:44:13,383] {bash_operator.py:158} INFO -  - Timeout waiting for instance hourly-job-w-0 to report in.@-@{"workflow": "hourly_processing", "task-id": "create_dataproc_cluster", "execution-date": "2021-02-02T12:13:47.063326+00:00"}
[2021-02-02 12:44:13,384] {bash_operator.py:158} INFO -  - Timeout waiting for instance hourly-job-w-1 to report in.@-@{"workflow": "hourly_processing", "task-id": "create_dataproc_cluster", "execution-date": "2021-02-02T12:13:47.063326+00:00"}
[2021-02-02 12:44:13,387] {bash_operator.py:158} INFO -  - Timeout waiting for instance hourly-job-w-9 to report in..@-@{"workflow": "hourly_processing", "task-id": "create_dataproc_cluster", "execution-date": "2021-02-02T12:13:47.063326+00:00"}
[2021-02-02 12:44:13,561] {bash_operator.py:162} INFO - Command exited with return code 1@-@{"workflow": "hourly_processing", "task-id": "create_dataproc_cluster", "execution-date": "2021-02-02T12:13:47.063326+00:00"}
[2021-02-02 12:44:13,615] {taskinstance.py:1152} ERROR - Bash command failed
Traceback (most recent call last)
File "/usr/local/lib/airflow/airflow/models/taskinstance.py", line 985, in _run_raw_tas
result = task_copy.execute(context=context
File "/usr/local/lib/airflow/airflow/operators/bash_operator.py", line 166, in execut
raise AirflowException("Bash command failed")
airflow.exceptions.AirflowException: Bash command failed
[2021-02-02 12:44:13,618] {taskinstance.py:1196} INFO - Marking task as FAILED. dag_id=hourly_processing, task_id=create_dataproc_cluster, execution_date=20210202T121347, start_date=20210202T121355, end_date=20210202T124413
[2021-02-02 12:44:13,718] {local_task_job.py:159} WARNING - State of this instance has been externally set to failed. Taking the poison pill.
[2021-02-02 12:44:13,721] {helpers.py:325} INFO - Sending Signals.SIGTERM to GPID 3911
[2021-02-02 12:44:13,898] {helpers.py:291} INFO - Process psutil.Process(pid=3911, status='terminated', exitcode=1, started='12:13:54') (3911) terminated with exit code 1
[2021-02-02 12:44:13,900] {local_task_job.py:102} INFO - Task exited with return code 1```
创建DAG 的代码片段
def create_dag():
with models.DAG(
dag_id='Hourly_processing',
schedule_interval=None,
default_args=default_dag_args
)as dag:
create_dataproc_cluster_command = 'gcloud dataproc clusters create ' + cluster_name + 
' --bucket ' + bucket_name + 
' --zone ' + zone + 
' --subnet ' + sub_network_uri + 
' --region ' + region +                                           
' --project ' + project_id + 
' --service-account ' + service_account + 
' --master-machine-type ' + master_machine_type + 
' --master-boot-disk-size 500' + 
' --worker-machine-type ' + worker_machine_type + 
' --worker-boot-disk-size 500' + 
' --num-workers ' + str(no_worker_machine) + 
' --image-version ' + image + 
' --properties ' + properties + 
' --no-address'
create_dataproc_cluster = bash_operator.BashOperator(
task_id='create_dataproc_cluster',
bash_command=create_dataproc_cluster_command)

Dataproc集群开始配置,但最终由于上述错误而失败。任何帮助,非常感谢。提前谢谢。

找到问题。我必须在使用的VPC子网上启用私有Google访问。

最新更新