BigQuery Storage API并行运行流



我正在尝试使用BigQuery Storage API获取一个巨大的BigQuery表。目前,我正在使用一个流按顺序获取数据。该程序将在使用数十个虚拟CPU的服务器上运行,因此我希望并行获取表以获得性能。我使用的bq存储版本是google.cloud.bigquery.storage.v1我在这篇文章中看到,为了并行计算多个流,可以指定一个BALANCED的分片策略,但在v1中似乎不存在。

这个选项似乎存在于v1_beta中,但我在repo的代码中找不到它。这个选项还存在吗?或者,否则我如何实现并行会话?

from google.cloud.bigquery_storage import types
from google.cloud import bigquery_storage
def get_df_parallel():
num_cores = 12
bqclient = BigQueryClient()
bqstorageclient = bigquery_storage.BigQueryReadClient(credentials=CREDENTIALS)
stringify_table = f"..."
parent = "projects/{}".format(VARIABLES['PROJECT_ID'])

requested_session = types.ReadSession(
table=stringify_table,
data_format=types.DataFormat.ARROW,
)
read_session = bqstorageclient.create_read_session(
parent=parent, 
read_session=requested_session, 
max_stream_count=num_cores,
# module 'google.cloud.bigquery_storage_v1.gapic_types' has no attribute 'ShardingStrategy'
sharding_strategy=(types.ShardingStrategy.BALANCED),
)
readers = []
for stream in read_session.streams:
position = bigquery_storage.types.StreamPosition(stream=stream)
reader = bqstorageclient.read_rows(position)
readers.append(reader)
df = pd.concat([reader.to_dataframe(session) for reader in readers])
return df

bigquery_storage_v1beta1已经在弃用过程中,并且删除了对它的支持,因此不存在python文档。尽管它仍然是可访问的,如果您将遵循v1beta1 RPC引用,并且如果导入,ShardingStrategy仍然存在于v1beta1中。但是,最好避免使用它,因为图书馆在不久的将来会使用它。

v1beta1neneneba API尚未正式弃用,将通过一个完整的折旧周期(https://cloud.google.com/products#product-发射阶段(服务被拒绝。但是,新代码应该使用v1 API向前地

根据BigQuery存储v1 python文档,您可以使用多处理。进程以创建并行进程。您可以尝试实现此方法。

因为此客户端使用grpcio库,所以共享实例是安全的跨线程。在多处理场景中,最佳做法是通过多处理。池或多处理。过程

最新更新