Kafka - 根据标头值将邮件从"Topic A"重定向到"Topic B"



我想将 kafka 消息从名为"all-topic"的主题重定向到名为"headervalue-topic"的主题,其中 headervalue 是每条消息具有的自定义标头的值。

目前,我正在使用一个自定义控制台应用程序,该应用程序使用消息并将消息重定向到正确的主题,但它每秒仅处理 16 条消息。

kafka 和 zookeeper 都在 docker 容器中运行,配置如下:

zookeeper:
  image: "wurstmeister/zookeeper:latest"
  restart: always
  ports:
    - "2181:2181"
  environment:
    ZOOKEEPER_CLIENT_PORT: 2181
    ZOOKEEPER_SERVER_ID: 1
kafka:
  hostname: kafka
  image: "wurstmeister/kafka:latest"
  restart: always
  depends_on:
    - zookeeper
  ports:
    - "9092:9092"
  environment:
    KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181"
    KAFKA_ADVERTISED_HOST_NAME: kafka
    KAFKA_ADVERTISED_PORT: 9092

实现目标的最佳和最快方法是什么?

我确实知道Kafka Streams的存在,但我不熟悉Java,所以如果你想建议Kafka Streams一个小例子,将不胜感激:)

非常感谢!

这是我使用 kafka-streams nodejs 库提出的解决方案:

const {KafkaStreams} = require("kafka-streams");
const {nativeConfig: config} = require("./config.js");
const kafkaStreams = new KafkaStreams(config);
const myConsumerStream = kafkaStreams.getKStream("all-topic");
myConsumerStream
    .mapJSONConvenience()
    .filter((element) => {
        return element.value.type == "Article";
    })
    .tap((element) => {console.log("Got Article")})
    .mapWrapKafkaValue()
    .to("Article-topic", 1, "buffer");
myConsumerStream.start();

我所知,您无法直接通过DSL访问标头。不过,您可以使用流处理器通过ProcessorContext访问它,这是我想出的一个小例子:

public class CustomProcessor1 implements Processor<String, String> {
private ProcessorContext context;
@Override
public void init(ProcessorContext processorContext) {
    this.context = processorContext;
}
@Override
public void process(String key, String value) {
    HashMap<String, String> headers = new HashMap<>();
    for (Header header : context.headers()) {
            headers.put(header.key(), new String(header.value()));
    }
    String headerValue = headers.get("certainHeader").replace(""", "");
    if (headerValue.equals("expectedHeaderValue")) {
        context.forward(key, value);
    }
}

上面是处理器,它将使用与 headerValue 匹配的 certainHeader 转发消息到下游进程。创建流式处理拓扑时将使用此处理器,如下所示:

public static void main(String[] args) throws Exception {
    Properties props = getProperties();
    final Topology topology = new Topology()
            .addSource("SOURCE", "all.topic")
            .addProcessor("CUSTOM_PROCESSOR_1", CustomProcessor1::new, "SOURCE")
            .addProcessor("CUSTOM_PROCESSOR_2", CustomProcessor2::new, "SOURCE")
            .addSink("SINK1", "headervalue1-topic", "CUSTOM_PROCESSOR_1")
            .addSink("SINK2", "headervalue2-topic", "CUSTOM_PROCESSOR_2");

相关内容

  • 没有找到相关文章

最新更新