如何在同一盒子上独立于同一盒子上运行多个Kafka消费者



我有两个kafka消费者ConsumerAConsumerB。我想在同一台计算机上运行这两个Kafka消费者。他们之间根本没有关系。这两个Kafka消费者将在同一台计算机上处理不同的主题。

  • 每个消费者都应具有不同的属性对象。
  • 每个消费者都应具有不同的线程池配置

以下是我的设计:

消费者类(摘要):

 public abstract class Consumer implements Runnable {
    private final Properties consumerProps;
    private final String consumerName;
    public Consumer(String consumerName, Properties consumerProps) {
        this.consumerName = consumerName;
        this.consumerProps = consumerProps;
    }
    protected abstract void shutdown();
    protected abstract void run(String consumerName, Properties consumerProps);
    @Override
    public final void run() {
        run(consumerName, consumerProps);
    }
}

消费类别:

public class ConsumerA extends Consumer {
    private final AtomicBoolean closed = new AtomicBoolean(false);
    private KafkaConsumer<byte[], byte[]> consumer;
    public ConsumerA(String consumerName, Properties consumerProps) {
        super(consumerName, consumerProps);
    }
    @Override
    public void shutdown() {
        closed.set(true);
        consumer.wakeup();
    }
    @Override
    protected void run(String consumerName, Properties consumerProps) {
        consumer = new KafkaConsumer<>(consumerProps);
        consumer.subscribe(getTopicsBasisOnConsumerName());
        Map<String, Object> config = new HashMap<>();
        config.put(Config.URLS, TEST_URL);
        GenericRecordDomainDataDecoder decoder = new GenericRecordDomainDataDecoder(config);
        try {
            while (!closed.get()) {
                ConsumerRecords<byte[], byte[]> records = consumer.poll(Long.MAX_VALUE);
                for (ConsumerRecord<byte[], byte[]> record : records) {
                    GenericRecord payload = decoder.decode(record.value());
                    // extract data from payload
                    System.out.println("topic = %s, partition = %s, offset = %d, customer = %s, country = %sn",
                                      record.topic(), record.partition(), record.offset(), record.key(), record.value());
                }
                consumer.commitAsync();
            }
        } catch (WakeupException ex) {
            // Ignore exception if closing
            System.out.println("error= ", ex);
            if (!closed.get()) throw e;             
        } catch (Exception ex) {
            System.out.println("error= ", ex);      
        } finally {
            try {
                consumer.commitSync();
            } finally {
                consumer.close();
            }
        }
    }
}

消费者B类:

// similar to `ConsumerA` but with specific details of B

commuterHandler类:

public final class ConsumerHandler {
  private final ExecutorService executorServiceConsumer;
  private final Consumer consumer;
  private final List<Consumer> consumers = new ArrayList<>();
  public ConsumerHandler(Consumer consumer, int poolSize) {
    this.executorServiceConsumer = Executors.newFixedThreadPool(poolSize);
    this.consumer = consumer;
    for (int i = 0; i < poolSize; i++) {
      this.consumers.add(consumer);
      executorServiceConsumer.submit(consumer);
    }
 }
  public void shutdown() {
    Runtime.getRuntime().addShutdownHook(new Thread() {
      @Override
      public void run() {
        for (Consumer consumer : consumers) {
          consumer.shutdown();
        }
        executorServiceConsumer.shutdown();
        try {
          executorServiceConsumer.awaitTermination(1000, TimeUnit.MILLISECONDS);
        } catch (InterruptedException ex) {
          Thread.currentThread().interrupt();
        }
      }
    });
  }
}

下面是我项目之一中的主要课程,如果我启动服务器,则将自动进行呼叫,从这个地方开始我的所有Kafka消费者,我执行ConsumerAConsumerB。一旦调用了关闭,我就会通过对所有Kafka消费者进行关闭来发布所有资源。

import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import javax.inject.Singleton;
@Singleton
@DependencyInjectionInitializer
public class Initializer {
  private ConsumerHandler consumerHandlerA;
  private ConsumerHandler consumerHandlerB;
  @PostConstruct
  public void init() {
    consumerHandlerA = new ConsumerHandler (new ConsumerA("consumerA", getConsumerPropsA()), 3);
    consumerHandlerB = new ConsumerHandler (new ConsumerB("consumerB", getConsumerPropsB()), 3);
  }
  @PreDestroy
  public void shutdown() {
    consumerHandlerA.shutdown();
    consumerHandlerB.shutdown();
  }
}

这是我想在同一盒子上运行多个Kafka消费者的这种问题的正确设计吗?让我知道是否有更好,有效的方法来解决此问题。总的来说,我将在同一框上运行三个或四个Kafka消费者Max,并且每个消费者都可以在需要时拥有自己的消费者组。

这是我在消费者中使用的Kafkaconsumer的Javadoc。在本文的基础上,我创建了我的消费者,只是我使用了抽象类扩展它。在该链接中搜索"将所有内容放在一起"

在文档中提到的是,消费者不是线程安全,但看来我的代码重复了池中每个线程的相同消费者实例。

public ConsumerHandler(Consumer consumer, int poolSize) {
    this.executorServiceConsumer = Executors.newFixedThreadPool(poolSize);
    this.consumer = consumer;
    for (int i = 0; i < poolSize; i++) {
      this.consumers.add(consumer);
      executorServiceConsumer.submit(consumer);
    }
 }

解决此线程安全问题并仍然达到相同功能的最佳方法是什么?

一个快速的建议,如果您已经知道了,则很抱歉。类级变量永远不会安全。如果您需要为每个线程具有不同的属性对象,请更好地在方法级别上声明它们,并将它们作为参数提供给您需要访问属性对象的其他方法。

最简单的解决方案解决"解决此线程安全问题并仍然达到相同功能的最佳方法是什么?":

请勿实现多线程(线程API/Executor服务),而是在其自己的单独的JVM过程中使用并运行每个消费者作为单个消费者,因此,如果您需要同一机器上的4个消费者,则不想处理Mutli螺纹头痛,然后让您的kafka消费者代码罐在自己的4个单独的Java进程中运行。

尝试apache samza。它解决了这些消费者问题。线程的处理(有时甚至是有问题的)处理,通过聚类进行冗余,通过数万亿个经过验证的已处理的Mesages等进行了验证的解决方案。我们目前在集群上运行了多个工作。我们的代码比您在这里的代码要复杂得多。

相关内容

最新更新