BigQuery-更新架构中的LoadJobConfig与QueryJobConfig



我遇到的情况如下:事实上,我试图更新分区表的模式(由时间单位列分区)。我用这篇文章和这个例子作为参考,医生说

schemaUpdateOptions[]:在两种情况下支持架构更新选项:当writeDisposition为WRITE_APPEND时;当writeDisposition为WRITE_TRUNCATE并且目标表是由分区装饰器指定的表的分区时。对于普通表,WRITE_TRUNCATE将始终覆盖架构。

所以我所理解的是,使用LoadJobConfig().schema_update_options = [bigquery.SchemaUpdateOption.ALLOW_FIELD_ADDITION]:

  • 对于法线表,LoadJobConfig().write_disposition
WRITE_APPEND WRITE_TTRUNCATE
成功更新架构并将新数据附加到表中 成功更新架构,但表被新数据覆盖

每个类之间都有一些细微的差异,我建议您注意每个类的默认配置,如果其中任何一个最终返回错误,可能是由于配置及其功能的不正确初始化造成的,您可以解决您的问题。

查询作业配置

QueryJobConfig(**kwargs)

查询作业的配置选项。

中的所有属性都是可选的。值为:data:None->服务器默认值。使用属性名称作为关键字参数的名称,对构造的配置设置属性。

from google.cloud import bigquery
# Construct a BigQuery client object.
client = bigquery.Client()
# TODO(developer): Set table_id to the ID of the destination table.
# table_id = "your-project.your_dataset.your_table_name"
# Retrieves the destination table and checks the length of the schema.
table = client.get_table(table_id)  # Make an API request.
print("Table {} contains {} columns".format(table_id, len(table.schema)))
# Configures the query to append the results to a destination table,
# allowing field addition.
job_config = bigquery.QueryJobConfig(
destination=table_id,
schema_update_options=[bigquery.SchemaUpdateOption.ALLOW_FIELD_ADDITION],
write_disposition=bigquery.WriteDisposition.WRITE_APPEND,
)
# Start the query, passing in the extra configuration.
query_job = client.query(
# In this example, the existing table contains only the 'full_name' and
# 'age' columns, while the results of this query will contain an
# additional 'favorite_color' column.
'SELECT "Timmy" as full_name, 85 as age, "Blue" as favorite_color;',
job_config=job_config,
)  # Make an API request.
query_job.result()  # Wait for the job to complete.
# Checks the updated length of the schema.
table = client.get_table(table_id)  # Make an API request.
print("Table {} now contains {} columns".format(table_id, len(table.schema)))

LoadJobConfig

LoadJobConfig(**kwargs)

加载作业的配置选项。

使用属性名称作为关键字参数的名称,在构造的配置上设置属性。未设置的值或:data:None使用BigQueryREST API默认值。有关默认值的列表,请参阅BigQueryREST API参考文档。

所需选项因source_format值而异。例如,source_format的BigQuery API's默认值为"CSV"。加载CSV文件时,必须将架构或自动检测设置为:data:True

# from google.cloud import bigquery
# client = bigquery.Client()
# project = client.project
# dataset_ref = bigquery.DatasetReference(project, 'my_dataset')
# filepath = 'path/to/your_file.csv'
# Retrieves the destination table and checks the length of the schema
table_id = "my_table"
table_ref = dataset_ref.table(table_id)
table = client.get_table(table_ref)
print("Table {} contains {} columns.".format(table_id, len(table.schema)))
# Configures the load job to append the data to the destination table,
# allowing field addition
job_config = bigquery.LoadJobConfig()
job_config.write_disposition = bigquery.WriteDisposition.WRITE_APPEND
job_config.schema_update_options = [
bigquery.SchemaUpdateOption.ALLOW_FIELD_ADDITION
]
# In this example, the existing table contains only the 'full_name' column.
# 'REQUIRED' fields cannot be added to an existing schema, so the
# additional column must be 'NULLABLE'.
job_config.schema = [
bigquery.SchemaField("full_name", "STRING", mode="REQUIRED"),
bigquery.SchemaField("age", "INTEGER", mode="NULLABLE"),
]
job_config.source_format = bigquery.SourceFormat.CSV
job_config.skip_leading_rows = 1
with open(filepath, "rb") as source_file:
job = client.load_table_from_file(
source_file,
table_ref,
location="US",  # Must match the destination dataset location.
job_config=job_config,
)  # API request
job.result()  # Waits for table load to complete.
print(
"Loaded {} rows into {}:{}.".format(
job.output_rows, dataset_id, table_ref.table_id
)
)
# Checks the updated length of the schema
table = client.get_table(table)
print("Table {} now contains {} columns.".format(table_id, len(table.schema)))

应该注意的是,google.cloud.bigquery.job.SchemaUpdateOption在两个类中都被重载,并指定对目标表模式的更新,以允许作为查询作业的副作用。

最新更新