如何从 Flink 运行器上的 Google Dataflow (Apache Beam) 向 Kafka 发送消息



我正在尝试编写一个概念验证,它从Kafka获取消息,使用Flink上的Beam转换它们,然后将结果推送到不同的Kafka主题上。

我使用KafkaWindowedWordCountExample作为起点,这是我想做的第一部分,但它输出到文本文件而不是Kafka。FlinkKafkaProducer08看起来很有前途,但我不知道如何将其插入管道。我以为它会用UnboundedFlinkSink或类似的东西包裹,但这似乎不存在。

对我要做的事情有什么建议或想法吗?

我正在运行最新的孵化器光束(截至昨晚从Github),集群模式下的Flink 1.0.0和Kafka 0.9.0.1,所有这些都在Google Compute Engine(Debian Jessie)上。

Beam中目前没有UnboundedSink类。大多数无限接收器都是使用 ParDo 实现的。

您可能希望查看 KafkaIO 连接器。这是一个适用于所有 Beam 运行器的 Kafka 读取器,并实现了并行读取、检查点和其他UnboundedSource API。该拉取请求还包括 TopHashtags 示例管道中的粗汇,方法是在 ParDo 中写入 Kafka:

class KafkaWriter extends DoFn<String, Void> {
  private final String topic;
  private final Map<String, Object> config;
  private transient KafkaProducer<String, String> producer = null;
  public KafkaWriter(Options options) {
    this.topic = options.getOutputTopic();
    this.config = ImmutableMap.<String, Object>of(
        "bootstrap.servers", options.getBootstrapServers(),
        "key.serializer",    StringSerializer.class.getName(),
        "value.serializer",  StringSerializer.class.getName());
  }
  @Override
  public void startBundle(Context c) throws Exception {
    if (producer == null) { // in Beam, startBundle might be called multiple times.
      producer = new KafkaProducer<String, String>(config);
    }
  }
  @Override
  public void finishBundle(Context c) throws Exception {
    producer.close();
  }
  @Override
  public void processElement(ProcessContext ctx) throws Exception {
    producer.send(new ProducerRecord<String, String>(topic, ctx.element()));
  }
}

当然,我们也想在KafkaIO中添加接收器支持。它实际上与上面的KafkaWriter相同,但使用起来要简单得多。

用于

写入Kafka的Sink转换于2016年添加到Apache Beam/Dataflow中。有关使用示例,请参阅 Apache Beam 中的 JavaDoc KafkaIO

相关内容

  • 没有找到相关文章

最新更新