KAFKA 1.0流媒体API:分区的消息消耗被延迟



最近,我已经将流应用程序从Spark-Streaming 2.1切换到使用Kafka Broker Server 0.11.0.0

使用kafka-tream-stream-streaming new API(1.0(

我已经实现了自己的处理器类,在过程方法中,我刚刚打印了消息内容。

我有一个Kafka群集,由3台机器组成,我正在挂上的主题有300个分区。

我在带有32 GB的RAM和8核的机器上运行了带有100个线程的流应用程序。

我的问题是,在某些情况下,我收到了一旦到达Kafka主题/分区的消息,在其他情况下,我在10-15分钟到达主题后得到了消息,不知道为什么为什么!

我使用以下命令行来跟踪group.ID的kafka主题的滞后。

./bin/kafka-run-class.sh kafka.admin.ConsumerGroupCommand --bootstrap-server kafka1:9092,kafka2:9092,kafka3:9092 --new-consumer --describe --group kf_streaming_gp_id

,但不幸的是,它并没有始终如一地给出准确的结果,甚至根本没有给出结果,任何身体都知道为什么?

流媒体应用程序我错过了一些东西,以便我可以始终如一地读取消息后阅读消息吗?任何消费者属性解决了此类问题。

我的kafka-treaming应用程序结构如下:

        Properties config = new Properties();
        config.put(StreamsConfig.APPLICATION_ID_CONFIG, "kf_streaming_gp_id");
        config.put(StreamsConfig.CLIENT_ID_CONFIG, "kf_streaming_gp_id");
        config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka1:9092,kafka2:9092,kafka3:9092");
        config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, DocumentSerde.class);
        config.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, CustomTimeExtractor.class);
        config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        config.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 100);
        KStream<String, Document> topicStreams = builder.stream(sourceTopic);
        topicStreams.process(() -> new DocumentProcessor(appName, environment, dimensions, vector, sinkTopic));
        KafkaStreams streams = new KafkaStreams(builder.build(), config);
        streams.start();

我弄清楚了我的情况是什么。

事实证明,从事较高的CPU密集型工作一直存在线程,这导致阻止其他线程消耗消息,这就是为什么我看到这样的爆发,当我停止这种CPU密集型逻辑时,一切都非常快,并且一切都非常快,并且消息到达Kafka主题后,消息将进入流媒体工作。

相关内容

  • 没有找到相关文章

最新更新