上传CSV文件到已分区的bigquery表(根据文件名生成分区)



我正在使用bigquery客户端对象将一些CSV文件(位于云存储中)上传到bigquery表中。

我成功地将数据上传到bigquery表中,但我想将目标表更改为分区表。分区将是文件名中的日期。

文件名

为CSV文件中的一列,与CSV文件名相同。这就是我如何从文件名中提取日期(假设文本是filename) date1稍后将用作我们的分区:

text = 'sales_2022-09-09T21-27-05_018787'
match = re.search(r'd{4}-d{2}-d{2}', text)
date1 = datetime.strptime(match.group(), '%Y-%m-%d').date()

,这是如何上传数据到BQ:

client = bigquery.Client.from_service_account_json(CREDENTIALS_LOCATION)
def upload_from_gcs_to_bq(project_id, dataset_id, gsutil_uri, table_name,gcs_blob):
table_id = project_id +'.'+ dataset_id +'.'+ table_name
uri = gsutil_uri + '/' + gcs_blob +'.csv'
job_config = bigquery.LoadJobConfig(
schema=[
bigquery.SchemaField("filename", "STRING"),
bigquery.SchemaField("sales_category", "STRING"),
...
],
skip_leading_rows=1,
# time_partitioning=bigquery.TimePartitioning(
#     type_=bigquery.TimePartitioningType.DAY,
#     field="date",  # Name of the column to use for partitioning.
#     expiration_ms=7776000000,  # 90 days.
# ),
)    
load_job = client.load_table_from_uri(
uri, table_id, job_config=job_config
) 
load_job.result()  # Wait for the job to complete.
table = client.get_table(table_id)
def main():
upload_from_gcs_to_bq(project_id, dataset_id, gsutil_uri, table_name,gcs_blob)
if __name__ == '__main__':
main()

我认为最好利用外部表,因为您的数据已经存储在云存储中。

您可以通过直接读取CSV文件来创建外部表,永久的或临时的。

https://cloud.google.com/bigquery/docs/external-data-cloud-storage

然后将信息加载到按目标字段划分的表中。

如果您有分区文件,也有一个不错的选择,将它们加载为外部表,但您需要遵循云存储中的特定格式

https://cloud.google.com/bigquery/docs/hive-partitioned-queries-gcs

相关内容

  • 没有找到相关文章

最新更新