为什么我的Python BigQuery Dataflow接收器没有将记录插入数据库



我使用Python(2.7),并在谷歌的DataFlow环境中工作,不用说,谷歌还没有完全清除所有内容,文档还不够。然而,从数据流到BigQuery的写入部分在这里记录了BigQuerySink。

根据文档,为了指定模式,您需要输入一个字符串:

schema = 'field_1:STRING, field_2:STRING, field_3:STRING, created_at:TIMESTAMP, updated_at:TIMESTAMP, field_4:STRING, field_5:STRING'

表名、项目ID和数据集ID如下:`example_project_ID:example_dataset_ID.example_table_name`

现在,所有这些都在起作用。请参阅下面的代码,但从我所看到的,它成功地创建了表和字段。注意:项目ID被设置为函数参数的一部分。

bq_data | beam.io.Write(
"Write to BQ", beam.io.BigQuerySink(
'example_dataset_id.{}'.format(bq_table_name),
schema=schema,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED
)
)

现在,看起来我可以通过使用以下内容插入内容:

bq_data = pipeline | beam.Create(
[{
'field_1': 'ExampleIdentifier',
'field_2': 'ExampleValue',
'field_3': 'ExampleFieldValue',
'created_at': '2016-12-26T05:50:39Z',
'updated_at': '2016-12-26T05:50:39Z',
'field_4': 'ExampleDataIdentifier',
'field_5: 'ExampleData'
}]
)

但出于某种原因,当将值打包到PCollection中时,它说它插入到BigQuery中,但当我查询表时,它什么也不显示。

为什么不插入?我没有看到任何错误,但没有任何内容插入到BigQuery中。

这就是PCollection中包含的数据的样子,我有将近1100行要插入:

{'field_1': 'ExampleIdentifier', 'field_2': 'ExampleValue', 'field_3': 'ExampleFieldValue', 'created_at': '2016-12-29 12:10:32', 'updated_at': '2016-12-29 12:10:32', 'field_4': 'ExampleDataIdentifier', 'field_5': 'ExampleData'}

注意:我检查了日期格式,上面的日期格式允许BigQuery插入。

这个答案已经很晚了,但也许它会帮助其他人。您在管道中的写入语句写得不正确。

bq_data | 'Write to BigQuery' >> 
beam.io.Write(beam.io.BigQuerySink(known_args.output_table, 
schema=schema, 
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED, 
write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE)) # This is overwrite whatever you have in your table

我尝试了一个使用您的确切模式和输入的示例,它对我有效。我必须进行以下修复。

(1) 似乎你没有在你的论点中指定一个项目。您可能在管道定义中指定了此项,因为您没有看到此项的错误。(2) 你上面提到的代码中有一个拼写错误。'field_5: 'ExampleData'应为'field_5': 'ExampleData'但我认为这只是这个问题中的一个拼写错误,而不是在你最初的管道中,因为你没有收到错误。

您正在运行最新版本的Dataflow吗?你可以尝试创建一个新的虚拟环境,并运行"pip-install谷歌云数据流"来安装最新版本。

可以和我分享你的全部吗?

由于您使用的是"DirectPipelineRunner",因此很难远程调试。是否可以尝试使用"DataflowPipelineRunner"运行相同的管道(请注意,您需要一个为此启用计费的GCP项目)?如果您可以使用"DataflowPipelineRunner"运行日志并提供作业id,我将能够查看日志。

最新更新