如何从airflow中使用本地bq操作符实现python任务



在处理项目时遇到了一种情况,即我们希望使用airflow执行一些任务,但不允许使用python运算符,而是指示使用本地BigQuery运算符。有人能帮我写这样的代码吗?比如设置气流变量,或者如何编写将由BQ运算符执行的条件代码。有可能吗?如果不是,那么我的下一个问题是,是否有可能使用BQ运算符从BQ表中获取结果,并将其分配给一个python变量,所以想要同时使用BQ和python运算符,有什么方法吗?

因此,以下是如何使用BigQuery运算符并使用交叉通信xcom_pull将数据发送到另一个任务的示例。

您可以使用BigQueryGetDataOperatorBigQueryOperator通过自定义查询来查询数据。这些操作符将为您返回一个列表,这样您就可以在另一个任务中获得它。我在示例中的bash运算符中使用了它:

from airflow import models
from airflow.operators.bash import BashOperator
from airflow.providers.google.cloud.operators.bigquery import (
BigQueryGetDataOperator,
)
from airflow.utils.dates import days_ago
PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "<project-name>")
BQ_LOCATION = "europe-north1"
TABLE_NAME="<table-name>"
DATASET_NAME = os.environ.get("GCP_BIGQUERY_DATASET_NAME", "<ds-name>")

with models.DAG(
"example_bigquery_operations",
schedule_interval='@once',  # Override to match your needs
start_date=days_ago(1),
tags=["example"],
) as dag:
get_data = BigQueryGetDataOperator(
task_id="get_data",
dataset_id=DATASET_NAME,
table_id=TABLE_NAME,
max_results=1,
selected_fields="name",
#location=BQ_LOCATION,
)

get_dataset_result = BashOperator(
task_id="get_dataset_result",
bash_command="echo "{{ task_instance.xcom_pull('get_data') }}"",
)

get_data >> get_dataset_result
[2021-12-06 17:12:07,641] {logging_mixin.py:109} INFO - Running <TaskInstance: example_bigquery_operations.get_dataset_result 2021-12-05T00:00:00+00:00 [running]> on host airflow-worker-c92mz
[2021-12-06 17:12:07,937] {taskinstance.py:1254} INFO - Exporting the following env vars:
AIRFLOW_CTX_DAG_OWNER=airflow
AIRFLOW_CTX_DAG_ID=example_bigquery_operations
AIRFLOW_CTX_TASK_ID=get_dataset_result
AIRFLOW_CTX_EXECUTION_DATE=2021-12-05T00:00:00+00:00
AIRFLOW_CTX_DAG_RUN_ID=scheduled__2021-12-05T00:00:00+00:00
[2021-12-06 17:12:07,939] {subprocess.py:52} INFO - Tmp dir root location: 
/tmp
[2021-12-06 17:12:07,939] {subprocess.py:63} INFO - Running command: ['bash', '-c', 'echo "[['Tom']]"']
[2021-12-06 17:12:08,242] {subprocess.py:74} INFO - Output:
[2021-12-06 17:12:08,245] {subprocess.py:78} INFO - [['Tom']]
[2021-12-06 17:12:08,246] {subprocess.py:82} INFO - Command exited with return code 0

最新更新