在Apache Beam管道中运行下一步之前,BigQuery架构更改未传播



我正在使用apachebeam在一个管道中实现3个步骤。

  1. 从mongodb读取数据
  2. 如果mongodb数据中有新列,请更新bigquery中的架构,并为beam.io.WriteToBigQuery构建新架构
  3. 将数据保存到bigquery中

我注意到在步骤2中更新模式之前执行了步骤3;读取数据时出错,错误消息:JSON表遇到太多错误,放弃了"。但是,如果我再次运行相同的代码,数据可以成功保存。看起来在步骤2中更新了在完整模式之前执行的第一次步骤3。

我是Apache beam的新手。你能帮忙吗?谢谢我的代码附在下面。

dim_seller_etl_executor = (
p1
| "read" >> beam.io.ReadFromMongoDB(uri='mongodb:///',
db='',
coll='',
bucket_auto=True,
extra_client_params={"username": "",
"password": ""})
| "transform" >> beam.Map(transform_doc)
| 'save' >> beam.io.Write((beam.io.WriteToBigQuery("table_id",
schema=table_schema_for_beam,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)))
)
def transform_doc(document):
global table_schema_for_beam
global column_name_type
new_columns = []
for name, value in document.items():
if name not in column_name_type:
# some ways to get the column type
new_columns.append((name, column_type))
else:
column_type = column_name_type[name]
data[name] = document[name] if document[name] is not None else None
# if new columns appear, update the schema in bigquery and the schema used in beam.io.WriteToBigQuery
if new_columns:
bigquery_schema.add_columns(new_columns)
table_schema_for_beam, column_name_type = bigquery_schema.get_table_schema_for_beam()
return data

我可以看到两个问题:

  1. BigQuery架构更改不会立即传播,但传播可能需要几分钟时间https://cloud.google.com/bigquery/docs/error-messages#metadata-流插入错误
  2. table_schema_for_beam变量在管道施工期间进行评估。在管道运行时更改这些变量可能会产生未知的影响,或者根本没有影响

最新更新