将pandas dataframe(从CSV)批量写入BigQuery



我有一个csv文件列表,我想复制这些行并按顺序将它们推送到BQ。目前,我使用pandas读取csv文件,并使用to_gbq方法在bigquery中获取数据。但是,由于文件很大(每个文件几gb),我希望以批处理模式摄取数据,以避免任何内存错误。

我写了以下更新的函数,bq-client似乎比to_gbq快。

from google.cloud import bigquery
import pandas
from tqdm import tqdm
import warnings
warnings.simplefilter("always", category=PendingDeprecationWarning) 
warnings.simplefilter("always", category=DeprecationWarning) 
def df_to_bq(df, table_id, table_schema, batch_size = None):
client = bigquery.Client(project = 'high-theme-12435')
job_config = bigquery.LoadJobConfig(schema=table_schema, source_format=bigquery.SourceFormat.CSV)
if batch_size == None:
job = client.load_table_from_dataframe(
df, table_id, job_config=job_config
)
else:
for (batch_no, i) in tqdm(enumerate(range(0, len(df), batch_size))):
batch_df = df.iloc[i: i+batch_size]
job = client.load_table_from_dataframe(batch_df, table_id, job_config=job_config)
print(f"### DUMP to BQ done for batch {batch_no}. ({i} to {i+len(batch_df)}.) ###")
table_schema = [
bigquery.SchemaField("col1", "INTEGER"),
bigquery.SchemaField("col2", "STRING"),
bigquery.SchemaField("col3", "TIMESTAMP"),
bigquery.SchemaField("col4", "FLOAT"),
]
import pandas as pd
import datetime as dt
from dateutil import parser
df = pd.read_csv('test.csv')
def from_iso_date(date_str):
if not date_str:
return None
return parser.parse(date_str)
df['timecol'] = pd.to_datetime('now')
df['col3'] = df['col3'].apply(from_iso_date)

table_id = 'high-theme-12435.test.test_table'
df_to_bq(df, table_id, table_schema, batch_size = 1000)

最新更新