从Cloud Function的文件到达事件触发composer DAG上的任务



我可以从云功能触发气流任务吗?

基本上我的问题是这个。我有一些文件到达谷歌云存储。同一DAG中的多个文件。我需要在文件到达时触发转换作业。我在考虑使用云功能。但在我的DAG中有很多依赖性的工作。

感谢提供的任何帮助

您不一定需要Cloud Function来感知GCS中的文件,Composer具有可用于实现此目的的GCS传感器。

假设您必须监视bucket/folder/file_*.csv中的文件,然后:

from airflow.contrib.operators.gcs_list_operator import GoogleCloudStorageListOperator
from airflow.contrib.sensors.gcs_sensor import GoogleCloudStoragePrefixSensor 
import datetime as dt
from airflow.models import DAG
from airflow.operators.dagrun_operator import TriggerDagRunOperator
lasthour = dt.datetime.now() - dt.timedelta(hours=1)
args = {
'owner': 'airflow',
'start_date': lasthour,
'depends_on_past': False,
}
dag = DAG(
dag_id='GCS_sensor_dag',
schedule_interval=None,
default_args=args
)
GCS_File_list = GoogleCloudStorageListOperator(
task_id= 'list_Files',
bucket= 'bucketname',
prefix='folder/file_',
delimiter='.csv',
google_cloud_storage_conn_id='google_cloud_default',
dag = dag
)
file_sensor = GoogleCloudStoragePrefixSensor(
task_id='gcs_polling',  
bucket='bucketname',
prefix='folder/file_',
dag=dag
)
trigger = TriggerDagRunOperator(
task_id='trigger_dag_{timestamp}_rerun'.format(timestamp=((dt.datetime.now() - dt.datetime.utcfromtimestamp(0)).total_seconds()*1000)),
trigger_dag_id="GCS_sensor_dag",
dag=dag
)
file_sensor >> GCS_File_list >> trigger

您可以触发DAG来响应云存储桶中的更改。为了实现这一点,Cloud Composer DAG可以由Cloud函数触发。已经有很好的官方文档和Codelabs来描述工作流程。它将按以下方式工作:

  1. 上传一个文件到云存储桶,这将
  2. 使用Python/Node.JS运行时触发云函数
  3. 此函数将在Cloud Composer中执行DAG

记住一件事。当您将处于Creating your function步骤时。您需要填写这一行:const WEBSERVER_ID = 'your-tenant-project-id';。要检索该变量,请转到Airflow UI,而不是Admin -> Configuration,然后搜索base_url键,即webserver-id(不包括https://.appspot.com部分(。

另一种方法是使用以下命令:

gcloud composer environments describe <ENVIRONMENT_NAME> --location <LOCATION>

您将能够看到config:->airflowUri变量。

我试过一次这种场景,效果很好。请随意提出更多问题。我希望你觉得以上信息有用。

我认为Cloud函数和GoogleCloudStoragePrefixSensor之间存在差异。只要发生任何事件(如文件到达(,云功能就会触发。这样就可以保证只拾取最新的文件。相比之下,GoogleCloudStoragePrefixSensor在预定义的时间运行(作为DAG的一部分(,并在给定的位置拾取匹配前缀的EXISTING文件。例如,在日常工作中,今天的文件到达并进行了处理。若那个文件继续放在那个里,那个么同样的文件可能会被这个传感器拾取。因此,在应用时需要小心。当然,你可以使用插件智能来获取最新的文件,并在处理后归档文件,除此之外,其他都必须添加。

为了使所描述的过程自动化,我们可以将Google Cloud Dataflow用于流式处理和批处理管道,将Google BigQuery用于数据存储,将Apache Airflow用于工作流管理。以下是逻辑概述,然后是流式处理和批处理管道的图表和示例代码,以及第一个和第二个Airflow DAG。

逻辑:

  1. 流式数据流管道不断地获取数据,并将其近乎实时地存储到BigQuery中
  2. 在BigQuery中存储数据后,会向Airflow DAG发送一个触发器,以启动批处理数据流管道
  3. 批处理数据流管道处理加载到BigQuery中的数据
  4. 一旦批处理作业完成,它将触发下一个批处理作业,并且该过程将继续

图表:

+------------------+
| Streaming Pipeline |
|   (Dataflow)      |
+--------+---------+
|
|
+----------v-----------+
|                      |
|     BigQuery         |
|                      |
+----------+-----------+
|
|
+-------------v-------------+
|                           |
|   Airflow DAG (Batch Job)  |
|        (Dataflow)         |
+-------------+-------------+
|
|
+-------------v-------------+
|                           |
|   Airflow DAG (Batch Job)  |
|        (Dataflow)         |
+-------------+-------------+

流式传输管道(Dataflow(的示例代码:

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
# Define pipeline options
pipeline_options = PipelineOptions()
# Define pipeline logic
def process_streaming_data(data):
# Process streaming data here
# Store data into BigQuery
# Create the pipeline
with beam.Pipeline(options=pipeline_options) as p:
# Read streaming data source
streaming_data = p | beam.io.ReadFromSource(...)

# Apply processing logic
processed_data = streaming_data | beam.ParDo(process_streaming_data)
# Write to BigQuery
processed_data | beam.io.WriteToBigQuery(...)

批处理管道(Dataflow(的示例代码:

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
# Define pipeline options
pipeline_options = PipelineOptions()
# Define pipeline logic
def process_batch_data(data):
# Process batch data here
# Create the pipeline
with beam.Pipeline(options=pipeline_options) as p:
# Read data from BigQuery
batch_data = p | beam.io.ReadFromBigQuery(...)

# Apply processing logic
processed_data = batch_data | beam.ParDo(process_batch_data)
# Write results to a sink (e.g., BigQuery, GCS, etc.)
processed_data | beam.io.WriteTo...
# Trigger the next batch job
p.run().wait_until_finish()

第一个气流DAG的示例代码:

from airflow import DAG
from airflow.contrib.operators.dataflow_operator import DataFlowPythonOperator
from datetime import datetime
# Define DAG configuration
dag_config = {
'start_date': datetime(2023, 6, 1),
'schedule_interval': '@daily',
}
# Define the DAG
dag = DAG('streaming_batch_pipeline', default_args=dag_config)
# Define the Dataflow task
streaming_pipeline_task = DataFlowPythonOperator(
task_id='streaming_pipeline_task',
py_file='<path_to_streaming_pipeline_file>',
options={'streaming': True},
dag=dag
)
# Define the trigger for the batch pipeline
trigger_batch_pipeline_task = DataFlow
PythonOperator(
task_id='trigger_batch_pipeline_task',
py_file='<path_to_batch_pipeline_file>',
options={'streaming': False},
dag=dag
)
# Set task dependencies
streaming_pipeline_task >> trigger_batch_pipeline_task

第二个气流DAG的示例代码(由第一个DAG触发(:

from airflow import DAG
from airflow.contrib.operators.dataflow_operator import DataFlowPythonOperator
from datetime import datetime
# Define DAG configuration
dag_config = {
'start_date': datetime(2023, 6, 1),
'schedule_interval': '@daily',
}
# Define the DAG
dag = DAG('batch_pipeline', default_args=dag_config)
# Define the Dataflow task
batch_pipeline_task = DataFlowPythonOperator(
task_id='batch_pipeline_task',
py_file='<path_to_batch_pipeline_file>',
options={'streaming': False},
dag=dag
)

请注意,提供的示例代码是一个简化的表示,您需要填写特定于您的用例的实际逻辑和配置,例如数据源详细信息、BigQuery表模式、管道选项等。

最新更新