将 Kafka 消费者和生产者集成到一个函数中



我们需要开发一个代码,其中消费者运行侦听特定的kafka生产者,然后在同一个函数中生成从当前消费的数据到不同的生产者主题的处理数据。

这是为了将 flinks 代码与 Java 集成,其中 Java 会向一个主题生成一条消息,然后 flink 使用它并生成指向不同主题的新数据,以便 Java 进一步处理它。

请让我们知道是否有其他方法可以完成此过程。

Flink 与 Kafka 很好地集成,如果需要,可以利用 Kafka 事务。这样的应用程序看起来像这样:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
FlinkKafkaConsumer011<> consumer = new FlinkKafkaConsumer011<IN>(topic_in, serializer_in, kafkaProperties);
FlinkKafkaProducer011<> producer = new FlinkKafkaProducer011<OUT>(broker, topic_out, serializer_out)
env.addSource(consumer)
   .map(new SuitableTransformation())
   .addSink(producer)
   .execute()

听起来像是"读-处理-写"模式。你可以利用 Kafka 的事务功能使这个过程原子化(或者不是,这取决于你,但下面的例子使用事务(:

KafkaProducer producer = createKafkaProducer(
  "bootstrap.servers", "localhost:9092",
  "transactional.id", "my-transactional-id");
producer.initTransactions();
KafkaConsumer consumer = createKafkaConsumer(
  "bootstrap.servers", "localhost:9092",
  "group.id", "my-group-id",
  "isolation.level", "read_committed");
consumer.subscribe(singleton("inputTopic"));
while (true) {
  ConsumerRecords records = consumer.poll(Long.MAX_VALUE);
  producer.beginTransaction();
  for (ConsumerRecord record : records)
    producer.send(producerRecord("outputTopic", record));
  producer.sendOffsetsToTransaction(currentOffsets(consumer), group);  
  producer.commitTransaction();
}

这是为了将 flinks 代码与 Java 集成,其中 Java 会向一个主题生成一条消息,然后 flink 使用它并生成指向不同主题的新数据,以便 Java 进一步处理它。

您可能需要考虑 Kafka Streams: https://docs.confluent.io/current/streams/developer-guide/index.html

这听起来像是使用 akka 流的好地方,

    val done =
  Consumer.committableSource(consumerSettings, Subscriptions.topics("topic1"))
    .map(msg => ProducerMessage.Message(new ProducerRecord[Array[Byte], String]("topic2", msg.record.value), msg.committableOffset))
    .via(Producer.flow(producerSettings))
    .map(_.message.passThrough)
    .batch(max = 20, first => CommittableOffsetBatch.empty.updated(first)) { (batch, elem) =>
      batch.updated(elem)
    }
    .mapAsync(3)(_.commitScaladsl())
    .runWith(Sink.ignore)

最新更新