使用 Python API 获取 BigQuery 临时表"destination table"



我的用例是创建一个临时表并从选择查询加载数据,然后使用Python API在云存储中将该表提取为CSV。

我可以使用Query Job创建和加载临时表,但无法计算出";目的地表";从导出到云存储所需的作业响应。

这是代码

from google.cloud import bigquery
bq_key = settings.BASE_DIR + '/api_keys/storage_bq_admin.json'
bq_client = bigquery.Client.from_service_account_json(bq_key, project='my-project-id')
query = """
EXECUTE IMMEDIATE
"CREATE TEMP TABLE segusers1 (user_id STRING, client_id STRING, inserted_at TIMESTAMP) AS SELECT user_id,client_id,inserted_at FROM (SELECT *, ROW_NUMBER() OVER(PARTITION BY user_id ORDER BY inserted_at DESC ) AS top FROM `project-id.prod.users_partition_by_client` WHERE partition_id = 3666 AND client_id = '123456' AND inserted_at > '2020-09-17 00:59:11.461')"
"""
query_job = bq_client.query(query, job_id="segment_temp_%s" % str(uuid.uuid4()))  # Make an API request.
results = query_job.result()  # Waits for job to complete.
bq_job_id = query_job.job_id
print(query_job.__dict__)

query_job返回

{'_retry': <google.api_core.retry.Retry object at 0x7fdc41758748>, '_result': <google.cloud.bigquery.job.QueryJob object at 0x7fdc3ca682e8>, '_exception': None, '_result_set': True, '_polling_thread': None, '_done_callbacks': [], '_properties': {'kind': 'bigquery#job', 'etag': '3uEKLSpG6pZPeLsnzA==', 'id': 'pid-107805:US.segment_temp_3f7e533f-eb64-427f-bbb6-d3e31d78ca56', 'selfLink': 'https://bigquery.googleapis.com/bigquery/v2/projects/pid-107805/jobs/segment_temp_3f7e533f-eb64-427f-bbb6-d3e31d78ca56?location=US', 'user_email': '', 'configuration': {'query': {'query': 'n            EXECUTE IMMEDIATEn      "CREATE TEMP TABLE segusers1 (user_id STRING, client_id STRING, inserted_at TIMESTAMP) AS SELECT user_id,client_id,inserted_at FROM (SELECT *, ROW_NUMBER() OVER(PARTITION BY user_id ORDER BY inserted_at DESC ) AS top FROM `pid.prod.users_partition_by_client` WHERE partition_id = 3666 AND client_id = 'cl3666dnx3klmb' AND inserted_at > '2020-09-17 00:59:11.461')"n            ', 'priority': 'INTERACTIVE', 'useLegacySql': False}, 'jobType': 'QUERY'}, 'jobReference': {'projectId': 'pid-107805', 'jobId': 'segment_temp_3f7e533f-eb64-427f-bbb6-d3e31d78ca56', 'location': 'US'}, 'statistics': {'creationTime': 1600359344198.0, 'startTime': 1600359344308.0, 'endTime': 1600359346615.0, 'totalBytesProcessed': '1292600', 'query': {'totalBytesProcessed': '1292600', 'totalBytesBilled': '10485760', 'totalSlotMs': '6637', 'statementType': 'SCRIPT'}, 'totalSlotMs': '6637', 'numChildJobs': '1', 'scriptStatistics': {}}, 'status': {'state': 'DONE'}}, '_client': <google.cloud.bigquery.client.Client object at 0x7fdc42448588>, '_completion_lock': <unlocked _thread.lock object at 0x7fdc42355d00>, '_configuration': <google.cloud.bigquery.job.QueryJobConfig object at 0x7fdc423d8fd0>, '_query_results': <google.cloud.bigquery.query._QueryResults object at 0x7fdc42467da0>, '_done_timeout': None, '_transport_timeout': None}

使用作业id 从文档API资源管理器

{
"kind": "bigquery#job",
"etag": "3uEKLSpg961G6pZPeA==",
"id": "pid-107805:US.segment_temp_3f7e533f-eb64-427f-bbb6-d3e31d78ca56",
"selfLink": "https://content-bigquery.googleapis.com/bigquery/v2/projects/pid-107805/jobs/segment_temp_3f7e533f-eb64-427f-bbb6-d3e31d78ca56?location=US",
"user_email": "storage-bq-admin@pid-107805.iam.gserviceaccount.com",
"configuration": {
"query": {
"query": "n            EXECUTE IMMEDIATEn      "CREATE TEMP TABLE segusers1 (user_id STRING, client_id STRING, inserted_at TIMESTAMP) AS SELECT user_id,client_id,inserted_at FROM (SELECT *, ROW_NUMBER() OVER(PARTITION BY user_id ORDER BY inserted_at DESC ) AS top FROM `pid-107805.prod.users_partition_by_client` WHERE partition_id = 3666 AND client_id = 'cl3666dnx3klmb' AND inserted_at u003e '2020-09-17 00:59:11.461')"n            ",
"priority": "INTERACTIVE",
"useLegacySql": false
},
"jobType": "QUERY"
},
"jobReference": {
"projectId": "pid-107805",
"jobId": "segment_temp_3f7e533f-eb64-427f-bbb6-d3e31d78ca56",
"location": "US"
},
"statistics": {
"creationTime": "1600359344198",
"startTime": "1600359344308",
"endTime": "1600359346615",
"totalBytesProcessed": "1292600",
"query": {
"totalBytesProcessed": "1292600",
"totalBytesBilled": "10485760",
"totalSlotMs": "6637",
"statementType": "SCRIPT"
},
"totalSlotMs": "6637",
"numChildJobs": "1",
"scriptStatistics": {}
},
"status": {
"state": "DONE"
}
}

两个返回都没有所需的destinationTable详细信息,我认为查询应该包含新创建的表的datasetId的值。不确定我在这里错过了什么。

这是对代码中EXECUTE IMMEDIATE和CREATE TEMP表的一种滥用。

如果您同时删除这两项,您的查询将变成一个类似SELECT的直接查询:

query = """
SELECT user_id,client_id,inserted_at ...
"""

然后,您将能够在作业中找到类似于以下内容的目的地表:

{
"configuration": {
"jobType": "QUERY", 
"query": {
"destinationTable": {     <======== what you're looking for
"datasetId": "_c53c0a2640dc04748b94ebc5d7193a6976b85fa1", 
"projectId": "yourProject", 
"tableId": "anon8b75560af5d60d88fd40befe1371bb83696c86e1"
}, 
...

最新更新