Python BigQuery Storage.并行读取多个流



我有以下玩具代码:

import pandas as pd
from google.cloud import bigquery_storage_v1beta1
import os
import google.auth
os.environ["GOOGLE_APPLICATION_CREDENTIALS"]='key.json'
credentials, your_project_id = google.auth.default(scopes=["https://www.googleapis.com/auth/cloud-platform"])
bq_storage_client = bigquery_storage_v1beta1.BigQueryStorageClient(credentials=credentials)
table_ref = bigquery_storage_v1beta1.types.TableReference()
table_ref.project_id = "bigquery-public-data"
table_ref.dataset_id = "libraries_io"
table_ref.table_id = "versions"
parent = "projects/{}".format(your_project_id)
session = client.create_read_session(
table_ref,
parent,
format_=bigquery_storage_v1beta1.enums.DataFormat.ARROW,
sharding_strategy=(bigquery_storage_v1beta1.enums.ShardingStrategy.BALANCED),
)
reader1 = bq_storage_client.read_rows(bigquery_storage_v1beta1.types.StreamPosition(stream=session.streams[0]), timeout=100000)
reader2 = bq_storage_client.read_rows(bigquery_storage_v1beta1.types.StreamPosition(stream=session.streams[1]), timeout=100000)
df = pd.concat([reader1.to_dataframe(session),reader2.to_dataframe(session)])
df

我使用平衡分片策略启动了 1 个以上的流,这些流可以独立读取。

BigqueryStorage 文档 说:

但是,如果您想扇出多个阅读器,您可以通过以下方式做到这一点 让读取器处理每个单独的流。

我启动了两个阅读器,会话中的每个流一个。之后,两个数据帧(每个读取器创建 1 个(连接成一个。然而,与LIQUID分片策略相比,这种方法并没有提供任何速度。

我试图让两个读者并行阅读行。但是,我未能在库文档中找到有关并行流读取的任何内容。

问题是:

1( 如果选择了平衡分片策略,BugQuery Storage 是否提供了同时读取多个流的原生方法?

2( 并行读取流的最佳方法是什么?我是否需要为此使用多处理或异步?

3(如果有人能提供有关并行流Reding的任何基本示例,我将不胜感激

BigQuery Storage API 确实支持多个流,但您的执行方法不支持。您可以创建多个读取器实例,然后每个实例都可以使用单独的流来提高吞吐量。

你有很多选择在python中进行并行处理。但是,最容易使用的一种是多处理包。

另一种选择是使用Apache Beam,它默认支持并行处理,但可能不适合您的用例。它有一个内置的 BigQuery IO 驱动程序,但它的 python 版本尚不支持 BigQuery Storage API,因此您可能需要为 BQ Storage API 编写自己的实现。

我做了一些研究,我意识到你使用了 BigQuery Storage API 中的代码,你是对的,如果你正在消费,使用平衡策略,多个流,需要提到的是它仍然处于测试版。

发生这种情况的一些原因是,也许您只看到 1 个流,因为流分配算法的数据相对较"小",流的数量可能低于请求的数量,具体取决于 2 个因素:表的合理并行性和服务的限制。目前,用于确定什么是"合理"的算法的详细信息尚未公开,并且一旦 API 达到正式发布阶段,这些细节可能会发生变化。

您也可以尝试上面推荐的多处理包。

您缺少requested_streams值:

n_streams = 2
session = client.create_read_session(
table_ref,
parent,
requested_streams=n_streams,
format_=bigquery_storage_v1beta1.enums.DataFormat.ARROW,
sharding_strategy=(bigquery_storage_v1beta1.enums.ShardingStrategy.BALANCED),
)

您可以在一行中连接数据帧:

readers = []
for i in range(n_streams):
stream = session.streams[i]
position = bigquery_storage_v1beta1.types.StreamPosition(stream=stream)
reader = bqstorageclient.read_rows(position)
readers.append(reader)
df = pd.concat([reader.to_dataframe(session) for reader in readers])

希望这有帮助。

最新更新