Kafka 流式处理不适用于多个实例



当我运行Kafka Streams应用程序的多个实例时,只有第一个实例正确接收消息。但是,如果我启动新实例,它们不会收到任何消息。

解决此问题的任何建议?

这是我的卡夫卡流媒体应用程序

package test.kafkastream;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.processor.TopologyBuilder;
public class Main {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-wordcount-processor");
        //props.put(ConsumerConfig.GROUP_ID_CONFIG, "streams-wordcount-processor");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.2.38:45983,192.168.2.112:45635,192.168.2.116:39571");
        //props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        //props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, MyEventTimeExtractor.class);

        // setting offset reset to earliest so that we can re-run the demo code
        // with the same pre-loaded data
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        TopologyBuilder builder = new TopologyBuilder();
        builder.addSource("Source", "topic6");
        builder.addProcessor("Process", new ProcessMessage(), "Source");
        KafkaStreams streams = new KafkaStreams(builder, props);
        streams.start();
    }
}

这是我的制片人

package test.kafkamesos;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.ByteArraySerializer;
public class Producer {
    public static void main(String[] args) throws InterruptedException, ExecutionException {
        Map<String, Object> producerConfig = new HashMap<String, Object>();
        producerConfig.put("bootstrap.servers", "192.168.2.38:45983,192.168.2.112:45635,192.168.2.116:39571");
        //producerConfig.put("bootstrap.servers", "localhost:9092");
        // optional:
        producerConfig.put("metadata.fetch.timeout.ms", "3000");
        producerConfig.put("request.timeout.ms", "3000");
        // ... other options:
        // http://kafka.apache.org/documentation.html#producerconfigs
        ByteArraySerializer serializer = new ByteArraySerializer();
        KafkaProducer<byte[], byte[]> kafkaProducer = new KafkaProducer<byte[], byte[]>(producerConfig, serializer,
                serializer);
        int i = 0;
        while (true) {
            String message = "{data:success,g:" + i + "}";
            ProducerRecord<byte[], byte[]> record = new ProducerRecord<byte[], byte[]>("topic6", message.getBytes());
            kafkaProducer.send(record).get();
            System.out.println("sending " + message);
            Thread.sleep(1000);
            i++;
        }
    }
}

和我的 Dockerfile

FROM openjdk:8-jre
COPY ./target/*-with-dependencies.jar /jars/service-jar.jar
CMD java -cp /jars/service-jar.jar test.kafkastream.Main
我相信

您遇到了这个问题,因为 Kafka 代理只为您正在使用的主题配置了一个分区(topic6(。来自 Confluent 博客:

例如,如果应用程序从具有 10 个 分区,然后您最多可以运行 10 个应用程序实例 (请注意,您可以运行更多实例,但这些实例将处于空闲状态(。 在 总结,主题分区数是 流 API 应用程序的并行性,因此对于 运行应用程序的实例。

来源: https://www.confluent.io/blog/elastic-scaling-in-kafka-streams/

相关内容

  • 没有找到相关文章

最新更新