我需要将消息从bigquery推送到pubsbub
bigquery表包含一个以json值作为字符串的列
读取该表并将其发布到json
p.apply("ReadSourceBQ";,BigQueryIO.readTableRows((.fromQuery("SELECT requestPayload FROM eventreplay"(.使用StandardSql((.withTemplateCompatibility(().apply("JSON转换",AsJsons.of(TableRow.class((.apply("WriteToPubSub";,PubsubIO.writeStrings((.to("主题名称"(;
它不会将消息发布到主题。。没有错误。如何将tableRow转换为json
听起来所提供的示例应该可以工作。请确保您正在运行p.run()
,因为实际运行管道非常重要。
下面是一个完整的例子(使用公共BigQuery数据集(,它对我有用:
public static void main(String[] args) {
PipelineOptions pipelineOptions = PipelineOptionsFactory.fromArgs(args).create();
Pipeline pipeline = Pipeline.create(pipelineOptions);
pipeline
.apply(
"ReadSourceBQ",
BigQueryIO.readTableRows()
.fromQuery("SELECT gameId FROM `bigquery-public-data`.`baseball`.`games_post_wide`")
.usingStandardSql()
.withTemplateCompatibility())
.apply("JSON Transform", AsJsons.of(TableRow.class))
.apply(
"WriteToPubSub",
PubsubIO.writeStrings().to("projects/{project_id}/topics/{topic_name}"));
pipeline.run();
}
此外,请注意AsJsons
来自beam-sdks-java-extensions-json-jackson
,因此您可能必须使用以下方法导入:
Maven
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-extensions-json-jackson</artifactId>
<version>${beam.version}</version>
</dependency>
渐变
implementation group: 'org.apache.beam', name: 'beam-sdks-java-extensions-json-jackson', version: '${beam.version}'