我正在尝试编写一个概念验证,它从Kafka获取消息,使用Flink上的Beam转换它们,然后将结果推送到不同的Kafka主题上。
我使用KafkaWindowedWordCountExample作为起点,这是我想做的第一部分,但它输出到文本文件而不是Kafka。FlinkKafkaProducer08看起来很有前途,但我不知道如何将其插入管道。我以为它会用UnboundedFlinkSink或类似的东西包裹,但这似乎不存在。
对我要做的事情有什么建议或想法吗?
我正在运行最新的孵化器光束(截至昨晚从Github),集群模式下的Flink 1.0.0和Kafka 0.9.0.1,所有这些都在Google Compute Engine(Debian Jessie)上。
Beam中目前没有UnboundedSink类。大多数无限接收器都是使用 ParDo
实现的。
您可能希望查看 KafkaIO 连接器。这是一个适用于所有 Beam 运行器的 Kafka 读取器,并实现了并行读取、检查点和其他UnboundedSource
API。该拉取请求还包括 TopHashtags 示例管道中的粗汇,方法是在 ParDo
中写入 Kafka:
class KafkaWriter extends DoFn<String, Void> {
private final String topic;
private final Map<String, Object> config;
private transient KafkaProducer<String, String> producer = null;
public KafkaWriter(Options options) {
this.topic = options.getOutputTopic();
this.config = ImmutableMap.<String, Object>of(
"bootstrap.servers", options.getBootstrapServers(),
"key.serializer", StringSerializer.class.getName(),
"value.serializer", StringSerializer.class.getName());
}
@Override
public void startBundle(Context c) throws Exception {
if (producer == null) { // in Beam, startBundle might be called multiple times.
producer = new KafkaProducer<String, String>(config);
}
}
@Override
public void finishBundle(Context c) throws Exception {
producer.close();
}
@Override
public void processElement(ProcessContext ctx) throws Exception {
producer.send(new ProducerRecord<String, String>(topic, ctx.element()));
}
}
当然,我们也想在KafkaIO
中添加接收器支持。它实际上与上面的KafkaWriter
相同,但使用起来要简单得多。
写入Kafka的Sink转换于2016年添加到Apache Beam/Dataflow中。有关使用示例,请参阅 Apache Beam 中的 JavaDoc KafkaIO
。