我想用Airflow创建一个BigQuery表(通过BigQuery钩子或BigQuery空表创建者(。不幸的是,无法使用Range分区创建。
有人提出了一个PR,但为了尽量减少气流操作员界面,他们建议跳过它。他们给出了这样的解决方案。
you can use the table_resource argument to pass any table definition you want, no need to specify every single parameter.
我不清楚,在气流中,我如何使用与Range分区相关的JSON。有人能给我举一个如何使用/实现它的例子吗?
我尝试了以下操作,但它创建了没有分区的表。(这是用于时间划分的,但BQ运算符对此有一个参数,但我想尝试使用table_resource
。
resource="""{
"TimePartitioning": {
"type": "DAY",
"field": "created_at"
}
}"""
schema="""[{"mode": "REQUIRED", "name": "code", "type": "STRING"},
{"mode": "REQUIRED", "name": "created_at", "type": "DATETIME"},
{"mode": "REQUIRED", "name": "service_name", "type": "STRING"}]"""
def bq_create(**kwargs):
table_schema = 'bhuvi'
table_name = 'sampletable'
create = BigQueryCreateEmptyTableOperator (
task_id='create_bq_{}'.format(table_name),
project_id = 'myproject',
dataset_id=table_schema,
table_id=table_name,
schema_fields=json.loads(schema),
bigquery_conn_id='bigquery_default',
table_resource =json.loads(resource)
)
create.execute(context=kwargs)
bqcreate = PythonOperator(
task_id="bqcreate",
python_callable=bq_create,
provide_context=True,
dag=dag
)
bqcreate
根据文档,时间分区的正确键是timePartitioning
,这不是问题所在吗?https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#Table