我使用以下Python GCP云函数将csv文件从GCS存储桶加载到BigQuery表。
def csv_in_gcs_to_table(bucket_name: str, object_name: str, dataset_id: str,
table_id: str,
schema: List[bigquery.SchemaField]) -> None:
"""Upload CSV to BigQuery table.
If the table already exists, it overwrites the table data.
Args:
bucket_name: Bucket name for holding the object
object_name: Name of object to be uploaded
dataset_id: Dataset id where the table is located.
table_id: String holding id of hte table.
schema: Schema of the table_id
"""
client = bigquery.Client()
dataset_ref = client.dataset(dataset_id)
job_config = bigquery.LoadJobConfig()
job_config.schema = schema
job_config.source_format = bigquery.SourceFormat.CSV
job_config.write_disposition = bigquery.WriteDisposition().WRITE_TRUNCATE
uri = "gs://{}/{}".format(bucket_name, object_name)
load_job = client.load_table_from_uri(uri,
dataset_ref.table(table_id),
job_config=job_config)
load_job.result()
每当一个新文件进入bucket时,就会触发该函数,并选择与object_name参数对应的文件。
我希望load函数选择最后上传到bucket的文件,换句话说就是触发事件的文件。
我的问题是如何实现。
根据@FrankvanPuffelen的建议,我调整了函数以捕获事件文件名。传递包含所有事件变量的事件参数,包括触发事件的精细名称。
def csv_in_gcs_to_table(event, context):
from google.cloud import bigquery
client = bigquery.Client()
bucket_name = "bucket_name"
object_name = event['name']
table_id = "project_id.dataset_name.table_name"
schema = [
bigquery.SchemaField('col1', 'string'),
bigquery.SchemaField('col2', 'string'),
]
job_config = bigquery.LoadJobConfig()
job_config.schema = schema
job_config.source_format = bigquery.SourceFormat.CSV
job_config.write_disposition = bigquery.WriteDisposition().WRITE_APPEND
job_config.skip_leading_rows = 1
uri = "gs://{}/{}".format(bucket_name, object_name)
load_job = client.load_table_from_uri(uri,
table_id,
job_config=job_config)
load_job.result()