我正试图从GCS中的CSV文件加载数据,但在Airflow中没有预定义的操作员来执行此操作。
我使用PSQL钩子和GCS文件读取器构建了一个简单的运算符,但我想知道是否有更好的解决方案,因为现在自定义运算符的工作方式是在一个循环上逐行运行;插入";打开GCS文件的语句。
是的,没有操作员将GCS的数据插入CLoud SQL,但您可以使用CloudSqlHook导入GCS文件。
这里有一个body的例子,它是一个包含文件行的dict,如果你的文件太大,你可以批量导入它(1k-10K行(,这比使用INSERT INTO
的循环要好得多。
您可以在用例中使用CloudSQLImportInstanceOperator。它将数据从云存储(CSV文件(导入到云SQL实例中。您可以访问此链接以获得有关此操作员的深入解释。
import datetime
from airflow import models
from airflow.operators import bash
from airflow.providers.google.cloud.operators.cloud_sql import CloudSQLImportInstanceOperator
# If you are running Airflow in more than one time zone
# see https://airflow.apache.org/docs/apache-airflow/stable/timezone.html
# for best practices
YESTERDAY = datetime.datetime.now() - datetime.timedelta(days=1)
PROJECT_ID = "project-id"
DAG_ID = "cloudsql"
#BUCKET_NAME = f"{DAG_ID}_{ENV_ID}_bucket"
INSTANCE_NAME = "instance-name"
import_body = {
"importContext": {
"uri": "gs://bucket/file.csv",
"fileType": "CSV",
"csvImportOptions": {
"table": "table",
"columns": [
"column1",
"column2"
]
},
"database": "guestbook",
"importUser": "postgres"
}
}
default_args = {
'owner': 'Composer Example',
'depends_on_past': False,
'email': [''],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': datetime.timedelta(minutes=5),
'start_date': YESTERDAY,
}
with models.DAG(
'composer_quickstart',
catchup=False,
default_args=default_args,
schedule_interval=datetime.timedelta(days=1)) as dag:
# Print the dag_run id from the Airflow logs
sql_import_task = CloudSQLImportInstanceOperator(
body=import_body, instance=INSTANCE_NAME, task_id='sql_import_task', project_id=PROJECT_ID)
sql_import_task