Bigquery Storage API Multiprocessing segfault



长期读者,首次发布。我正在使用BigQuery Storage API Python客户端库,并且在使用Python多处理拆分读者时遇到了一些问题。

文件中有一条注释,上面写着:

因为此客户端使用grpcio库,所以共享实例是安全的跨线程。在多处理场景中,最佳做法是通过多处理。池或多处理。过程

我认为我做得对。。。但我一定不是。

这是我目前的代码。目标是在多个并行流中读取BQ表,然后将数据行写入单个CSV文件。创建完所有CSV文件后,我将执行一个简单的cat命令来组合它们。

附带说明一下,此代码实际上适用于小型BigQuery表,但在尝试下载大型BQ表时,由于segfault而失败。

import faulthandler
faulthandler.enable()
from google.cloud.bigquery_storage import BigQueryReadClient
from google.cloud.bigquery_storage import types
import multiprocessing as mp
import psutil
import os
import sys
import csv
from datetime import datetime

def extract_table(i):
client_in = BigQueryReadClient()
reader_in = client_in.read_rows(session.streams[i].name, timeout=10000)
rows = reader_in.rows(session)
csv_file = "/home/user/sas/" + table_name + "_" + str(i) + ".csv"
print(f"Starting at time {datetime.now()} for file {csv_file}")
try:
with open(csv_file, 'w') as csvfile:
writer = csv.DictWriter(csvfile, fieldnames=csv_columns)
if i == 0:
writer.writeheader()
else:
pass
for data in rows:
# print(data)
writer.writerow(data)
except IOError:
print("I/O error")
print(f"Finished at time {datetime.now()} for file {csv_file}")
return

if __name__ == '__main__':
# Get input args
project_id = sys.argv[1]
db_name = sys.argv[2]
table_name = sys.argv[3]
n = len(sys.argv[4])
a = sys.argv[4][1:n - 1]
csv_columns = a.replace("'", '').split(', ')
output_type = sys.argv[5]  # csv or sas
bucket_root = sys.argv[6]
# The read session is created in this project. This project can be
# different from that which contains the table.
client = BigQueryReadClient()
table = "projects/{}/datasets/{}/tables/{}".format(
project_id, db_name, table_name
)
requested_session = types.ReadSession()
requested_session.table = table

# This API can also deliver data serialized in Apache Arrow format.
# This example leverages Apache Avro.
requested_session.data_format = types.DataFormat.AVRO
# We limit the output columns to a subset of those allowed in the table
requested_session.read_options.selected_fields = csv_columns

ncpus = psutil.cpu_count(logical=False)
if ncpus <= 2:
ncpus_buffer = 2
else:
ncpus_buffer = ncpus - 2
print(f"You have {ncpus} cores according to psutil. Using {ncpus_buffer} cores")
parent = "projects/{}".format(project_id)
session = client.create_read_session(
parent=parent,
read_session=requested_session,
max_stream_count=ncpus_buffer,
)
print(f"There are {len(session.streams)} streams")
num_streams = int(len(session.streams))
with mp.Pool(processes=ncpus_buffer) as p:
result = p.map(extract_table, list(range(0, num_streams)), chunksize=1)

使用以下命令样式调用代码:

python /home/user/sas/bq_extract_2.py gc-project-id dataset table "['column1', 'column2']" csv 'path/to/gcs/bucket'

同样,这适用于小表,有几次我让它适用于50-100GB大小范围内的非常大的BQ表。然而,大多数情况下,大型表格都会出现以下错误:

有1000个流根据psutil,您有2个核心。使用2个内核从时间2020-11-17 17:46:0.4645398开始用于文件/home/user/sas/diag_0.csv

从时间2020-11-17开始17:46:04.829381,适用于file/home/user/sas/degra_1.csv

Python致命错误:分段错误

线程0x00007f4293f94700(最近的调用优先):文件"home/user/anaconda3/envs/sas controller/lib/python3.8/site packages/grpc/_channel.py";,channel_spin文件中的第1235行"home/user/anaconda3/envs/sas controller/lib/python3.8/threading.py";,运行文件中的第870行"home/user/anaconda3/envs/sas controller/lib/python3.8/threading.py";,_bootstrap_inner文件中的第932行"home/user/anaconda3/envs/sas controller/lib/python3.8/threading.py";,基带中的第890行

线程0x00007f42bc4c9740(最近一次调用优先):文件"home/user/anaconda3/envs/sas controller/lib/python3.8/csv.py";,_dict_to_list文件中的第151行"home/user/anaconda3/envs/sas controller/lib/python3.8/csv.py";,写入行文件"中的第154行/home/user/sas/bq_extract_2.py";,线extract_table文件中的39"home/user/anaconda3/envs/sas controller/lib/python3.8/multiprocessing/pool.py";,mapstar文件中的第48行"home/user/anaconda3/envs/sas controller/lib/python3.8/multiprocessing/pool.py";,worker文件中的第125行"home/user/anaconda3/envs/sas controller/lib/python3.8/multiprocessing/process.py";,运行文件中的第108行"home/user/anaconda3/envs/sas controller/lib/python3.8/multiprocessing/process.py";,bootstrap文件中的第315行"home/user/anaconda3/envs/sas controller/lib/python3.8/multiprocessing/popen_fork.py";,启动文件中的第75行"home/user/anaconda3/envs/sas controller/lib/python3.8/multiprocessing/popen_fork.py";,init文件中的第19行"home/user/anaconda3/envs/sas controller/lib/python3.8/multiprocessing/context.py";,_Popen文件中的第277行"home/user/anaconda3/envs/sas controller/lib/python3.8/multiprocessing/process.py";,起始文件中的第121行"home/user/anaconda3/envs/sas controller/lib/python3.8/multiprocessing/pool.py";,_repopulate_pool_static文件中的第326行"home/user/anaconda3/envs/sas controller/lib/python3.8/multiprocessing/pool.py";,_repopulate_pool文件中的第303行"home/user/anaconda3/envs/sas controller/lib/python3.8/multiprocessing/pool.py";,init文件中的第212行"home/user/anaconda3/envs/sas controller/lib/python3.8/multiprocessing/context.py";,池文件中的第119行"/home/user/sas/bq_extract_2.py";,第157行模块内

编辑1:将.read_rows上的超时更新为10000,以允许从BQ中读取大量结果。还将max_stream_count更改为等于池将使用的核心数。这似乎对我的测试有很大帮助,但当我在谷歌云计算实例上作为启动脚本运行时,控制台输出中仍然会显示segfault。

编辑2:我对此研究得越多,就越不可能有效地使用Google BigQuery Storage API的Python多处理。考虑到在调用os.fork()之后需要创建读取会话,我无法确保为各个进程分配正确的读取行数。每个会话都在与所附的BQ表创建自己的一对多(一个会话对多个流)关系,并且每个会话似乎在流之间划分表行的方式略有不同。

以一个包含30行的表为例,我们希望用3个进程导出该表,每个进程处理一个行流。格式化在手机上可能看起来很奇怪。

os.fork()
Process 1              Process 2              Process 3
Session1               Session2               Session3
*Stream1 - 10 rows     Stream1 - 8 rows       Stream1 - 9 rows
Stream2 - 10 rows      *Stream2 - 12 rows     Stream2 - 11 rows
Stream3 - 10 rows      Stream3 - 10 rows      *Stream3 - 10 rows

在这个例子中,我们最终得到32个输出行,因为每个会话并没有以完全相同的方式定义其流。

我尝试使用线程(下面的代码)而不是进程,这很有效,因为gRPC是线程安全的。

# create read session here

# Then call the target worker function with one thread per worker
for i in range(0, num_streams):
t = threading.Thread(target=extract_table, args=(i,))
t.start()

然而,最大的问题是使用8个线程所需的时间与使用1个线程所用的时间一样长,并且无论现在使用多少个线程,跨线程的聚合吞吐量似乎都最高可达约5 MB/s。

这与使用进程形成了鲜明对比,在过程中,吞吐量似乎随着工人的增加而线性扩展(我在一些测试中看到高达约100 MB/s)。。。在极少数情况下,我能够让它在没有segfault干扰的情况下工作。这似乎纯粹是运气。

使用1个线程:

总时间:~3:11

使用8个线程:

总时间:~3:15

据我所知,使用多个线程基本上没有速度优势。

如果有人对我遗漏的任何东西有任何想法,请告诉我!我很希望能让它发挥作用。我真的很喜欢BQ Storage API的功能(行过滤器、列选择、无导出限制),但在找到适当的方法来扇出阅读器之前,我们将无法使用它。

grpcio(由google-cloud-bigquery-storage库使用)和多处理存在已知问题。根据该代码示例;工人子过程在任何gRPC服务器启动之前被分叉";。

由于您的工作负载主要受I/O限制,全局解释器锁定不应该是主要的性能瓶颈。我建议使用线程来分发工作,就像在google-cloud-bigquery库中所做的那样。

替换:

with mp.Pool(processes=ncpus_buffer) as p:
result = p.map(extract_table, list(range(0, num_streams)), chunksize=1)

带有:

with concurrent.futures.ThreadPoolExecutor(max_workers=num_streams) as p:
result = p.map(extract_table, list(range(0, num_streams)), chunksize=1)

最新更新