如何在运行时使用参数更改数据流作业图



我使用Dataflow从JDBC表中读取数据,并将结果加载到BigQuery表中。有一个参数";标志";如果该标志设置为True,则结果应加载到BigQuery中的一个附加表中。总结:

  1. 如果标志设置为False-从JDBC读取表A,则在BigQuery中写入表A
  2. 如果该标志设置为True-从JDBC读取表A,则在BigQuery中写入表A和表B

请参阅我的管道的样本代码

public static void main(String[] args) {
MyOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(MyOptions.class);
Pipeline pipeline = Pipeline.create(options);
ValueProvider < String > gcsFlag = options.getGcsFlag();
PCollection < TableRow > inputData = pipeline.apply("Reading JDBC Table",
JdbcIO. < TableRow > read()
.withDataSourceConfiguration(JdbcIO.DataSourceConfiguration
.create(options.getDriverClassName(), options.getJdbcUrl())
.withUsername(options.getUsername()).withPassword(options.getPassword()))
.withQuery(options.getSqlQuery())
.withCoder(TableRowJsonCoder.of())
.withRowMapper(new CustomRowMapper()));
inputData.apply(
"Write to BigQuery Table 1",
BigQueryIO.writeTableRows()
.withoutValidation()
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE)
.withCustomGcsTempLocation(options.getBigQueryLoadingTemporaryDirectory())
.to(options.getOutputTable()));
if (gcsFlag.get().equals("TRUE")) {
inputData.apply(
"Write to BigQuery Table 2",
BigQueryIO.writeTableRows()
.withoutValidation()
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE)
.withCustomGcsTempLocation(options.getBigQueryLoadingTemporaryDirectory())
.to(options.getOutputTable2()));
}
pipeline.run();
}

我面临的挑战是在编译和创建数据流模板的过程中必须通过ValueProvider。作业图仅在编译时构建,我无法在其他情况下再次使用相同的模板。

有没有一种方法可以让我在运行时传递ValueProvider<String> flag,并在运行时构建作业图?有了这个,我可以对这两种情况重用相同的模板。类似地,我也想在运行时提供sqlQuery(options.getSqlQuery(((。这样我就可以对所有要从Source读取的表使用相同的模板。

感谢您的帮助。

创建DAG时,它在运行时无法更改。

但是,你仍然有机会解决你的问题。尝试梁分区模式https://beam.apache.org/documentation/transforms/java/elementwise/partition/

最新更新