我想将 kafka 消息从名为"all-topic"的主题重定向到名为"headervalue-topic"的主题,其中 headervalue 是每条消息具有的自定义标头的值。
目前,我正在使用一个自定义控制台应用程序,该应用程序使用消息并将消息重定向到正确的主题,但它每秒仅处理 16 条消息。
kafka 和 zookeeper 都在 docker 容器中运行,配置如下:
zookeeper:
image: "wurstmeister/zookeeper:latest"
restart: always
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_SERVER_ID: 1
kafka:
hostname: kafka
image: "wurstmeister/kafka:latest"
restart: always
depends_on:
- zookeeper
ports:
- "9092:9092"
environment:
KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181"
KAFKA_ADVERTISED_HOST_NAME: kafka
KAFKA_ADVERTISED_PORT: 9092
实现目标的最佳和最快方法是什么?
我确实知道Kafka Streams的存在,但我不熟悉Java,所以如果你想建议Kafka Streams一个小例子,将不胜感激:)
非常感谢!
这是我使用 kafka-streams nodejs 库提出的解决方案:
const {KafkaStreams} = require("kafka-streams");
const {nativeConfig: config} = require("./config.js");
const kafkaStreams = new KafkaStreams(config);
const myConsumerStream = kafkaStreams.getKStream("all-topic");
myConsumerStream
.mapJSONConvenience()
.filter((element) => {
return element.value.type == "Article";
})
.tap((element) => {console.log("Got Article")})
.mapWrapKafkaValue()
.to("Article-topic", 1, "buffer");
myConsumerStream.start();
我所知,您无法直接通过DSL访问标头。不过,您可以使用流处理器通过ProcessorContext访问它,这是我想出的一个小例子:
public class CustomProcessor1 implements Processor<String, String> {
private ProcessorContext context;
@Override
public void init(ProcessorContext processorContext) {
this.context = processorContext;
}
@Override
public void process(String key, String value) {
HashMap<String, String> headers = new HashMap<>();
for (Header header : context.headers()) {
headers.put(header.key(), new String(header.value()));
}
String headerValue = headers.get("certainHeader").replace(""", "");
if (headerValue.equals("expectedHeaderValue")) {
context.forward(key, value);
}
}
上面是处理器,它将使用与 headerValue 匹配的 certainHeader 转发消息到下游进程。创建流式处理拓扑时将使用此处理器,如下所示:
public static void main(String[] args) throws Exception {
Properties props = getProperties();
final Topology topology = new Topology()
.addSource("SOURCE", "all.topic")
.addProcessor("CUSTOM_PROCESSOR_1", CustomProcessor1::new, "SOURCE")
.addProcessor("CUSTOM_PROCESSOR_2", CustomProcessor2::new, "SOURCE")
.addSink("SINK1", "headervalue1-topic", "CUSTOM_PROCESSOR_1")
.addSink("SINK2", "headervalue2-topic", "CUSTOM_PROCESSOR_2");