我可以从云功能触发气流任务吗?
基本上我的问题是这个。我有一些文件到达谷歌云存储。同一DAG中的多个文件。我需要在文件到达时触发转换作业。我在考虑使用云功能。但在我的DAG中有很多依赖性的工作。
感谢提供的任何帮助
您不一定需要Cloud Function来感知GCS中的文件,Composer具有可用于实现此目的的GCS传感器。
假设您必须监视bucket/folder/file_*.csv中的文件,然后:
from airflow.contrib.operators.gcs_list_operator import GoogleCloudStorageListOperator
from airflow.contrib.sensors.gcs_sensor import GoogleCloudStoragePrefixSensor
import datetime as dt
from airflow.models import DAG
from airflow.operators.dagrun_operator import TriggerDagRunOperator
lasthour = dt.datetime.now() - dt.timedelta(hours=1)
args = {
'owner': 'airflow',
'start_date': lasthour,
'depends_on_past': False,
}
dag = DAG(
dag_id='GCS_sensor_dag',
schedule_interval=None,
default_args=args
)
GCS_File_list = GoogleCloudStorageListOperator(
task_id= 'list_Files',
bucket= 'bucketname',
prefix='folder/file_',
delimiter='.csv',
google_cloud_storage_conn_id='google_cloud_default',
dag = dag
)
file_sensor = GoogleCloudStoragePrefixSensor(
task_id='gcs_polling',
bucket='bucketname',
prefix='folder/file_',
dag=dag
)
trigger = TriggerDagRunOperator(
task_id='trigger_dag_{timestamp}_rerun'.format(timestamp=((dt.datetime.now() - dt.datetime.utcfromtimestamp(0)).total_seconds()*1000)),
trigger_dag_id="GCS_sensor_dag",
dag=dag
)
file_sensor >> GCS_File_list >> trigger
您可以触发DAG来响应云存储桶中的更改。为了实现这一点,Cloud Composer DAG可以由Cloud函数触发。已经有很好的官方文档和Codelabs来描述工作流程。它将按以下方式工作:
- 上传一个文件到云存储桶,这将
- 使用Python/Node.JS运行时触发云函数
- 此函数将在Cloud Composer中执行DAG
记住一件事。当您将处于Creating your function
步骤时。您需要填写这一行:const WEBSERVER_ID = 'your-tenant-project-id';
。要检索该变量,请转到Airflow UI,而不是Admin -> Configuration
,然后搜索base_url
键,即webserver-id
(不包括https://
和.appspot.com
部分(。
另一种方法是使用以下命令:
gcloud composer environments describe <ENVIRONMENT_NAME> --location <LOCATION>
您将能够看到config:->airflowUri
变量。
我试过一次这种场景,效果很好。请随意提出更多问题。我希望你觉得以上信息有用。
我认为Cloud函数和GoogleCloudStoragePrefixSensor之间存在差异。只要发生任何事件(如文件到达(,云功能就会触发。这样就可以保证只拾取最新的文件。相比之下,GoogleCloudStoragePrefixSensor在预定义的时间运行(作为DAG的一部分(,并在给定的位置拾取匹配前缀的EXISTING文件。例如,在日常工作中,今天的文件到达并进行了处理。若那个文件继续放在那个里,那个么同样的文件可能会被这个传感器拾取。因此,在应用时需要小心。当然,你可以使用插件智能来获取最新的文件,并在处理后归档文件,除此之外,其他都必须添加。
为了使所描述的过程自动化,我们可以将Google Cloud Dataflow用于流式处理和批处理管道,将Google BigQuery用于数据存储,将Apache Airflow用于工作流管理。以下是逻辑概述,然后是流式处理和批处理管道的图表和示例代码,以及第一个和第二个Airflow DAG。
逻辑:
- 流式数据流管道不断地获取数据,并将其近乎实时地存储到BigQuery中
- 在BigQuery中存储数据后,会向Airflow DAG发送一个触发器,以启动批处理数据流管道
- 批处理数据流管道处理加载到BigQuery中的数据
- 一旦批处理作业完成,它将触发下一个批处理作业,并且该过程将继续
图表:
+------------------+
| Streaming Pipeline |
| (Dataflow) |
+--------+---------+
|
|
+----------v-----------+
| |
| BigQuery |
| |
+----------+-----------+
|
|
+-------------v-------------+
| |
| Airflow DAG (Batch Job) |
| (Dataflow) |
+-------------+-------------+
|
|
+-------------v-------------+
| |
| Airflow DAG (Batch Job) |
| (Dataflow) |
+-------------+-------------+
流式传输管道(Dataflow(的示例代码:
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
# Define pipeline options
pipeline_options = PipelineOptions()
# Define pipeline logic
def process_streaming_data(data):
# Process streaming data here
# Store data into BigQuery
# Create the pipeline
with beam.Pipeline(options=pipeline_options) as p:
# Read streaming data source
streaming_data = p | beam.io.ReadFromSource(...)
# Apply processing logic
processed_data = streaming_data | beam.ParDo(process_streaming_data)
# Write to BigQuery
processed_data | beam.io.WriteToBigQuery(...)
批处理管道(Dataflow(的示例代码:
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
# Define pipeline options
pipeline_options = PipelineOptions()
# Define pipeline logic
def process_batch_data(data):
# Process batch data here
# Create the pipeline
with beam.Pipeline(options=pipeline_options) as p:
# Read data from BigQuery
batch_data = p | beam.io.ReadFromBigQuery(...)
# Apply processing logic
processed_data = batch_data | beam.ParDo(process_batch_data)
# Write results to a sink (e.g., BigQuery, GCS, etc.)
processed_data | beam.io.WriteTo...
# Trigger the next batch job
p.run().wait_until_finish()
第一个气流DAG的示例代码:
from airflow import DAG
from airflow.contrib.operators.dataflow_operator import DataFlowPythonOperator
from datetime import datetime
# Define DAG configuration
dag_config = {
'start_date': datetime(2023, 6, 1),
'schedule_interval': '@daily',
}
# Define the DAG
dag = DAG('streaming_batch_pipeline', default_args=dag_config)
# Define the Dataflow task
streaming_pipeline_task = DataFlowPythonOperator(
task_id='streaming_pipeline_task',
py_file='<path_to_streaming_pipeline_file>',
options={'streaming': True},
dag=dag
)
# Define the trigger for the batch pipeline
trigger_batch_pipeline_task = DataFlow
PythonOperator(
task_id='trigger_batch_pipeline_task',
py_file='<path_to_batch_pipeline_file>',
options={'streaming': False},
dag=dag
)
# Set task dependencies
streaming_pipeline_task >> trigger_batch_pipeline_task
第二个气流DAG的示例代码(由第一个DAG触发(:
from airflow import DAG
from airflow.contrib.operators.dataflow_operator import DataFlowPythonOperator
from datetime import datetime
# Define DAG configuration
dag_config = {
'start_date': datetime(2023, 6, 1),
'schedule_interval': '@daily',
}
# Define the DAG
dag = DAG('batch_pipeline', default_args=dag_config)
# Define the Dataflow task
batch_pipeline_task = DataFlowPythonOperator(
task_id='batch_pipeline_task',
py_file='<path_to_batch_pipeline_file>',
options={'streaming': False},
dag=dag
)
请注意,提供的示例代码是一个简化的表示,您需要填写特定于您的用例的实际逻辑和配置,例如数据源详细信息、BigQuery表模式、管道选项等。