为什么我们需要使用线程来运行Kafka消费者?我们需要多少线程



我对Java还比较陌生(我对Scala有一些经验(,目前正在尝试学习Kafka。我在本教程中遇到了以下示例(我添加代码主要是为了参考(:

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
public class ConsumerDemoWithThread {
public static void main(String[] args) {
new ConsumerDemoWithThread().run();
}
private ConsumerDemoWithThread() {
}
private void run() {
Logger logger = LoggerFactory.getLogger(ConsumerDemoWithThread.class.getName());
String bootstrapServers = "127.0.0.1:9092";
String groupId = "my-sixth-application";
String topic = "first_topic";
// latch for dealing with multiple threads
CountDownLatch latch = new CountDownLatch(1);
// create the consumer runnable
logger.info("Creating the consumer thread");
Runnable myConsumerRunnable = new ConsumerRunnable(
bootstrapServers,
groupId,
topic,
latch
);
// start the thread
Thread myThread = new Thread(myConsumerRunnable);
myThread.start();
// add a shutdown hook
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
logger.info("Caught shutdown hook");
((ConsumerRunnable) myConsumerRunnable).shutdown();
try {
latch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
logger.info("Application has exited");
}
));
try {
latch.await();
} catch (InterruptedException e) {
logger.error("Application got interrupted", e);
} finally {
logger.info("Application is closing");
}
}
public class ConsumerRunnable implements Runnable {
private final CountDownLatch latch;
private final KafkaConsumer<String, String> consumer;
private final Logger logger = LoggerFactory.getLogger(ConsumerRunnable.class.getName());
public ConsumerRunnable(String bootstrapServers,
String groupId,
String topic,
CountDownLatch latch) {
this.latch = latch;
// create consumer configs
Properties properties = new Properties();
properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId);
properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
// create consumer
consumer = new KafkaConsumer<String, String>(properties);
// subscribe consumer to our topic(s)
consumer.subscribe(Collections.singletonList(topic));
}
@Override
public void run() {
// poll for new data
try {
while (true) {
ConsumerRecords<String, String> records =
consumer.poll(Duration.ofMillis(100)); // new in Kafka 2.0.0
for (ConsumerRecord<String, String> record : records) {
logger.info("Key: " + record.key() + ", Value: " + record.value());
logger.info("Partition: " + record.partition() + ", Offset:" + record.offset());
}
}
} catch (WakeupException e) {
logger.info("Received shutdown signal!");
} finally {
consumer.close();
// tell our main code we're done with the consumer
latch.countDown();
}
}
public void shutdown() {
// the wakeup() method is a special method to interrupt consumer.poll()
// it will throw the exception WakeUpException
consumer.wakeup();
}
}
}

我主要想了解:

  1. 使用线程运行消费者I(我认为卡夫卡无论如何都会抽象出消费者之间的负载分布(
  2. 当我们使用Thread myThread = new Thread(myConsumerRunnable);时在单个线程中运行还是跨多个线程运行
  3. 为什么我们要通过一个单独的线程触发关机挂钩?(根据我对该方法的理解,它似乎更像是Java的东西,而不是Kafka的东西(

使用线程运行使用者有什么好处?我思想卡夫卡抽象了负载在消费者之间的分布无论如何(

如您所见,使用者在run方法中启动了一个无限循环。将它作为一个新线程启动,可以在使用者已经处于活动状态时在主线程中做更多的事情。

当我们使用线程myThread=新线程(myConsumerRunable(时;这在>单线程还是跨多线程?

创建线程对象尚未启动新线程。myThread.start();是新线程开始执行的地方。您的示例程序有一个主线程和一个使用者线程。主线程实际上通过CountDownLatch latch等待关闭信号,因此可以说可以避免使用线程。

为什么我们要通过一个单独的线程触发关机挂钩?(据我所知从检查方法的角度来看,它更像是Java的东西,而不是Kafka的东西(

这是一个java的东西。在关闭发生之前,关闭挂钩线程实际上不会执行。看见https://docs.oracle.com/javase/7/docs/api/java/lang/Runtime.html#addShutdownHook(java.lang.Thread(

相关内容

最新更新