Apache Beam - PubSub消息到基于一个字段的几个BigQuery表



我有一个管道将PubSub消息从主题加载到BigQuery表中。为了处理更少的数据,我想基于"customer_id"来存储消息。字段转换为BigQuery表table-{customer_id}。然而,我正在努力找出如何正确地做到这一点。

下面,您将发现我的管道正在工作,但没有写入几个表。

public class PubsubAvroToBigQuery {
public interface Options extends PipelineOptions, StreamingOptions {
@Description("The Cloud Pub/Sub topic to read from.")
@Required
ValueProvider<String> getInputTopic();
void setInputTopic(ValueProvider<String> value);
@Description("BigQuery Table")
@Required
ValueProvider<String> getBigQueryTable();
void setBigQueryTable(ValueProvider<String> value);

}
public static void main(String[] args) {
Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
options.setStreaming(true);
try {
run(options);
} catch (IOException e) {
e.printStackTrace();
}
}
public static PipelineResult run(Options options) throws IOException {
// Create the pipeline
options.setStreaming(true);
Pipeline pipeline = Pipeline.create(options);
// load avro schema from classpath
Schema avroSchema = ApiCalls.SCHEMA$;
pipeline
.apply(
"Read PubSub Avro Message",
PubsubIO.readAvroGenericRecords(avroSchema).fromTopic(options.getInputTopic())
)
.apply("Write to BigQuery", BigQueryIO.<GenericRecord>write()
.to(options.getBigQueryTable())
.useBeamSchema()
.withWriteDisposition(WriteDisposition.WRITE_APPEND)
.withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
.optimizedWrites());
return pipeline.run();
}
}

我做了很多基于TupleTag甚至Windows(与数据分组)的测试,但它没有工作…希望你能帮助我谢谢你

为了实现您的用例,您可以使用DynamicDestinations类来根据输入事件选择要使用的TableDestination。

您可以使用它,如Javadoc中所述:

events.apply(BigQueryIO.<UserEvent>write()
.to(new DynamicDestinations<UserEvent, String>() {
public String getDestination(ValueInSingleWindow<UserEvent> element) {
// here read your input and extract the customer_id
return element.getValue().getUserId();
}
public TableDestination getTable(String user) {
// here build the destination table based on what is output by getDestination method
return new TableDestination(tableForUser(user), "Table for user " + user);
}
public TableSchema getSchema(String user) {
return tableSchemaForUser(user);
}
})
.withFormatFunction(new SerializableFunction<UserEvent, TableRow>() {
public TableRow apply(UserEvent event) {
return convertUserEventToTableRow(event);
}
}));

请注意,您可以创建一个类来实现DynamicDestinations,并使用构造函数来传递可在实现的方法中使用的参数。

相关内容

最新更新