我目前正在使用Java SDK构建流媒体管道,并尝试使用BigQueryIO write/writeTableRows写入BigQuery分区表。我探索了几种模式,但都没有成功;他们中很少有人在下面。
- 使用SerializableFunction确定TableDestination
.withSchema(TableSchemaFactory.buildLineageSchema())
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED) or CREATE_NEVER
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
and then calling this function inside the .to() method
@Override
public TableDestination apply(ValueInSingleWindow<TableRow> input) {
TimePartitioning timePartitioning = new TimePartitioning();
timePartitioning.setField("processingdate");
String dest = String.format("%s.%s.%s", project, dataset, table);
return new TableDestination(dest, null, timePartitioning);
我还尝试格式化从输入中获得的分区列,并将其添加为带有$annotation的String位置的一部分,如下所示:
@Override
public TableDestination apply(ValueInSingleWindow<TableRow> input) {
input.get("processingDate")
...convert to string MMddYYYY format
TimePartitioning timePartitioning = new TimePartitioning();
timePartitioning.setField("processingdate");
String dest = String.format("%s.%s.%s$%s", project, dataset, table, convertedDate);
return new TableDestination(dest, null, timePartitioning);
然而,它们都没有成功,也没有失败
- 时间戳无效
- 时间戳字段值超出范围
- 相对于当前日期,您只能在过去0天和将来0天内流式传输到分区
- 流式传输不支持目标表的分区。您只能流式传输到日期分区表的元表
- 不允许流式传输到基于列的分区表的元数据分区
我似乎找不到合适的组合。以前有人遇到过同样的问题吗?有人能给我指一个正确的方向或给我一些建议吗?我想要实现的是根据定义的日期列而不是处理时间加载流数据。
谢谢!
如果您从dest
中删除分区装饰器,我预计这些问题中的大多数都会得到解决。在大多数情况下,用于加载数据的BigQueryAPI将能够根据消息本身找出正确的分区。
因此,尝试将dest
的定义更改为:
String dest = String.format("%s.%s.%s", project, dataset, table);