云数据流作业调度



所以,我已经完成了在Dataflow中创建一个作业。这个任务处理从PostgreSQL到BigQuery的ETL。因此,我不知道如何使用气流创建调度。能否分享如何使用气流调度作业数据流?

谢谢

您可以使用Cloud Scheduler(完全托管的cron作业调度器)/Cloud Composer(基于Apache Airflow的完全托管的工作流编排服务)调度数据流批处理作业。

要使用Cloud Scheduler调度,请参考使用Cloud Scheduler调度数据流批处理作业

要使用Cloud Composer进行调度,请参考使用DataflowTemplateOperator使用Cloud Composer启动数据流管道。

使用Java/Python sdk在Airflow中运行数据流作业的示例和更多方法请参考Google Cloud Dataflow Operators

在您的AirflowDAG中,您可以使用schedule_interval参数定义cron和调度:

with airflow.DAG(
my_dag,
default_args=args,
schedule_interval="5 3 * * *"
# Trigger Dataflow job with an operator
launch_dataflow_job = BeamRunPythonPipelineOperator(
runner='DataflowRunner',
py_file=python_main_file,
task_id='launch_dataflow_job',
pipeline_options=dataflow_job_options,
py_system_site_packages=False,
py_interpreter='python3',
dataflow_config=DataflowConfiguration(
location='region'
)
)
launch_dataflow_job
......

最新更新