最近,我已经将流应用程序从Spark-Streaming 2.1切换到使用Kafka Broker Server 0.11.0.0
使用kafka-tream-stream-streaming new API(1.0(我已经实现了自己的处理器类,在过程方法中,我刚刚打印了消息内容。
我有一个Kafka群集,由3台机器组成,我正在挂上的主题有300个分区。
我在带有32 GB的RAM和8核的机器上运行了带有100个线程的流应用程序。
我的问题是,在某些情况下,我收到了一旦到达Kafka主题/分区的消息,在其他情况下,我在10-15分钟到达主题后得到了消息,不知道为什么为什么!
我使用以下命令行来跟踪group.ID的kafka主题的滞后。
./bin/kafka-run-class.sh kafka.admin.ConsumerGroupCommand --bootstrap-server kafka1:9092,kafka2:9092,kafka3:9092 --new-consumer --describe --group kf_streaming_gp_id
,但不幸的是,它并没有始终如一地给出准确的结果,甚至根本没有给出结果,任何身体都知道为什么?
流媒体应用程序我错过了一些东西,以便我可以始终如一地读取消息后阅读消息吗?任何消费者属性解决了此类问题。
我的kafka-treaming应用程序结构如下:
Properties config = new Properties();
config.put(StreamsConfig.APPLICATION_ID_CONFIG, "kf_streaming_gp_id");
config.put(StreamsConfig.CLIENT_ID_CONFIG, "kf_streaming_gp_id");
config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka1:9092,kafka2:9092,kafka3:9092");
config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, DocumentSerde.class);
config.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, CustomTimeExtractor.class);
config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
config.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 100);
KStream<String, Document> topicStreams = builder.stream(sourceTopic);
topicStreams.process(() -> new DocumentProcessor(appName, environment, dimensions, vector, sinkTopic));
KafkaStreams streams = new KafkaStreams(builder.build(), config);
streams.start();
我弄清楚了我的情况是什么。
事实证明,从事较高的CPU密集型工作一直存在线程,这导致阻止其他线程消耗消息,这就是为什么我看到这样的爆发,当我停止这种CPU密集型逻辑时,一切都非常快,并且一切都非常快,并且消息到达Kafka主题后,消息将进入流媒体工作。