受 BigQuery 查询影响的行数



我每天运行命令将新记录插入 BigQuery 表中,并希望记录每天插入的记录数。

我创建了一个包含SELECT查询和destination表的QueryJob对象。 我将write_disposition设置为WRITE_APPEND,以便将新数据追加到表中。

我找到了两个做类似事情的选项,但没有一个实现我想要的:

  • query_job.num_dml_affected_rows:这只返回 None,因为查询不使用 DMLINSERT,而是追加到目标表。
  • query_job.result().total_rows:返回表中的总行数,而不是新行数。

我可以想到各种达到预期结果的方法,但不确定最好的方法是什么:

  • 将查询更改为 DML 插入 - 但这意味着创建动态 SQL 而不是仅使用目标表是一个 python 变量。
  • 将结果转储到临时表中,计算行数,然后附加数据 - 但这似乎效率低下。
  • 计算查询前后的记录并记录增量 - 这可能会导致并行运行查询时出现问题。

任何建议没有最好的方法?


跟进马斯喀特的答案,我认为当查询并行运行时,这是行不通的:

Get number of rows: 1000 rows
Function to call queries in paralell:
- Query 1 --> Adds 100 rows --> finishes 3rd --> Counts 1200 rows
- Query 2 --> Adds 80 rows --> finishes 2nd --> Counts 1100 rows
- Query 3 --> Adds 20 rows --> finishes 1st --> Counts 1020 rows

因为无法知道这些查询将按哪个顺序完成(因为它们都是使用multiprocessing库并行调用的(,所以我不确定如何知道每个查询添加了多少行?

<小时 />

更新2

示例代码:

...
# We compile a list of which datasets need to be loaded from
brands = self._bq.select(f"Select brand,  gaDataset From {self.BRAND_DATASET}.{self.BRAND_TABLE}")
brands = list(brands.iterrows())
_, brands = zip(*brands)
# Define the function for parallel population
def populate_fn(brand):
return self._populate(brand, self.predicates)
logging.info("Populating daily stats for brands in parallel")
error = self._parallel_apply(populate_fn, brands)
if error is not None:
return error
def _populate(self, brand, predicates):
# We can't just call <bq_load_data> because we need to update the predicates for each brand
predicates.add_predicate('gaDataset', brand['gaDataset'], operator="_")
query_job = self._load_data(self.table_name, predicates=predicates)
logging.info(f"Started for {brand['gaDataset']}: {brand['brand']}")
self._run_query_job(query_job)
logging.info(f"{brand['gaDataset']}: {brand['brand']} is now populated.")

_populate功能为每个品牌并行运行。

predicates只是一个处理如何修改 Jinja 模板化 SQL 的对象,其中包含主对象中的一些常见参数,以及一些特定于品牌的参数。

_load_data是一个函数,它实际使用适当的参数加载 Jinja 模板化 SQL,并构造并返回一个QueryJob对象。

有效且推荐的方法是在运行查询之前和之后对记录进行计数。并行运行查询没有问题,因为我们可以等待查询作业完成,然后再检查更新的行数。我已经准备了如何检查新添加的行数的示例:

from google.cloud import bigquery
client = bigquery.Client()
# Define destination table.
table_id = "<PROJECT_ID>.<DATASET>.<TABLE>"
# Inspect the number of rows in the table before running the query.
table = client.get_table(table_id)
num_rows_begin = table.num_rows
print("Number of rows before running the query job: " + str(num_rows_begin))
sql = """
SELECT word, word_count
FROM `bigquery-public-data.samples.shakespeare`
LIMIT 10
"""
job_config = bigquery.QueryJobConfig(destination=table_id, write_disposition="WRITE_APPEND")
# Make an API request.
query_job = client.query(sql, job_config=job_config)
# Wait for the job to complete.  
query_job.result()
# Inspect the number of newly added rows in the table after running the query.
# First way:
num_rows_end = query_job._query_results.total_rows - num_rows_begin
print("Loaded {} rows into {}".format(str(num_rows_end), table_id))
# Second way:
table = client.get_table(table_id)
print("Loaded {} rows into {}".format(table.num_rows - num_rows_begin, table_id))

如您所见,检查新添加的行数的方法很少。第一个是指query_job:query_job._query_results.total_rows的结果,这基本上和query_job.result().total_rows一样。第二种方法获取有关项目中数据集的信息。这里重要的是,我们需要在检查行数之前再次调用table = client.get_table(table_id)方法。如果我们不这样做,系统将打印:Loaded 0 rows into table,因为它指的是运行查询之前指定的行数。

我希望您发现上述信息有用。

作为替代方案: 您可以按原样在GBQ中完成所有这些操作,甚至不需要转到api方式。

示例:插入到某个表中并查找插入了多少行:

insert into `YOUR_TABLE_PATH` (col1, col2) select 'test', 'test'; 
select @@row_count;

这是所有系统变量的引用 https://cloud.google.com/bigquery/docs/reference/system-variables

您可以按上述方式获取所有受影响的行。

最新更新