大查询:将数据导出到分层文件夹:年/月/日



我在 BigQuery 中有一个日期分区表,我想导出它。 我想导出它,以便每天的数据最终出现在不同的文件中。 例如,到具有嵌套文件夹结构的 GS 存储桶,如gs://my-bucket/YYYY/MM/DD/. 这可能吗?

请不要告诉我我需要为每天的数据运行单独的导出作业:我知道这是可能的,但在导出多年的数据时很痛苦,因为您需要运行数千个导出作业。

在导入方面,这可以通过镶木地板格式来实现。

如果直接使用 BigQuery 无法做到这一点,是否有像 dataproc 或 dataflow 这样的 GCS 工具可以简化此操作(链接到实际执行此导出的脚本的加分(。

带有 bq 提取的 bash 脚本会起作用吗?

#!/bin/bash
# Stop on first error
set -e;
# Used for Bigquery partitioning (to distinguish from bash variable reference)
DOLLAR="$"
# -I ISO DATE
# -d FROM STRING
start=$(date -I -d 2019-06-01) || exit -1
end=$(date -I -d 2019-06-15)   || exit -1
d=${start}
# string(d) <= string(end)
while [[ ! "$d" > "$end" ]]; do
YYYYMMDD=$(date -d ${d} +"%Y%m%d")
YYYY=$(date -d ${d} +"%Y")
MM=$(date -d ${d} +"%m")
DD=$(date -d ${d} +"%d")
# print current date
echo ${d}
cmd="bq extract --destination_format=AVRO 
'project:dataset.table${DOLLAR}${YYYYMMDD}' 
'gs://my-bucket/${YYYY}/${MM}/${DD}/part*.avro'
"
# execute    
eval ${cmd}
# d++
d=$(date -I -d "$d + 1 day")
done

也许您应该在 https://issuetracker.google.com/savedsearches/559654 请求新功能。

不是狂欢忍者,所以确保有一种更酷的方式来比较日期。

应 @Ben P 的要求,这是我之前用来并行运行大量导出作业的解决方案(python 脚本(。 这是非常粗略的代码,应该通过在运行后检查每个导出作业的状态以查看它是否成功来改进。

我不会接受这个答案,因为问题是寻找一种 bigquery 原生的方式来执行此任务。

请注意,此脚本用于导出版本化数据集,因此围绕许多用户可能不需要的额外逻辑。 它假定输入表和输出文件夹名称都使用版本。 这应该很容易去除。

import argparse
import datetime as dt
from google.cloud import bigquery
from multiprocessing import Pool
import random
import time
GCS_EXPORT_BUCKET = "YOUR_BUCKET_HERE"
VERSION = "dataset_v1"
def export_date(export_dt, bucket=GCS_EXPORT_BUCKET, version=VERSION):
table_id = '{}${:%Y%m%d}'.format(version, export_dt)
gcs_filename = '{}/{:%Y/%m/%d}/{}-*.jsonlines.gz'.format(version, export_dt, table_id)
gcs_path = 'gs://{}/{}'.format(bucket, gcs_filename)
job_id = export_data_to_gcs(table_id, gcs_path, 'currents')
return (export_dt, job_id)
def export_data_to_gcs(table_id, destination_gcs_path, dataset):
bigquery_client = bigquery.Client()
dataset_ref = bigquery_client.dataset(dataset)
table_ref = dataset_ref.table(table_id)
job_config = bigquery.job.ExtractJobConfig()
job_config.destination_format = 'NEWLINE_DELIMITED_JSON'
job_config.compression = 'GZIP'
job_id = 'export-{}-{:%Y%m%d%H%M%S}'.format(table_id.replace('$', '--'),
dt.datetime.utcnow())
# Add a bit of jitter
time.sleep(5 * random.random())
job = bigquery_client.extract_table(table_ref,
destination_gcs_path,
job_config=job_config,
job_id=job_id)
print(f'Now running job_id {job_id}')
time.sleep(50)
job.reload()
while job.running():
time.sleep(10)
job.reload()
return job_id

if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument('-s', "--startdate",
help="The Start Date - format YYYY-MM-DD (Inclusive)",
required=True,
type=dt.date.fromisoformat)
parser.add_argument('-e', "--enddate",
help="The End Date format YYYY-MM-DD (Exclusive)",
required=True,
type=dt.date.fromisoformat)
args = parser.parse_args()
start_date = args.startdate
end_date = args.enddate
dates = []
while start_date < end_date:
dates.append(start_date)
start_date += dt.timedelta(days=1)
with Pool(processes=30) as pool:
jobs = pool.map(export_date, dates, chunksize=1)

若要运行此代码,请将其放入名为bq_exporter.py的文件中,然后运行python bq_exporter.py -s 2019-01-01 -e 2019-02-01。 这将导出 2019 年 1 月,并打印每个导出作业的 ID。 您可以通过bq show -j JOB_ID使用 BigQuery CLI 检查作业的状态。

最新更新