是否建议在 Kafka 流应用程序中启动新线程(使用编程方式)?



我们正在开发一个使用低级处理器API的Kafka Streams应用程序。

根据 Kafka 上的文档,所有线程和并行性都由流线程和流任务处理。并行性也可以使用主题上的分区进行扩展。

当前代码如下所示:

public class Processor implements Processor<K, V> {
@Override
public void process(String key, V value) {
//Do processing on the stream thread itself
...
// Write back to output topic
context.forward(key, updatedValue)
}); 
}
}

但是,是否建议在任何情况下创建我们自己的线程来进行实际处理?这意味着使用 Kafka Streams API 主要用于使用主题中的数据,而不是用于实际处理。实际处理将在 Kafka 流线程中的初始数据消耗之后调用的新线程中进行。

拓扑中的示例处理器:

public class Processor implements Processor<K, V> {
@Override
public void process(String key, V value) {
//Spawn new thread to do the processing
ExecutorService executor = Executors.newSingleThreadExecutor();
executor.submit(() -> {
String threadName = Thread.currentThread().getName();
System.out.println("Hello " + threadName);
//Do more processing
...
// Write back to output topic
context.forward(key, updatedValue)
}); 
}
}

我已经为此尝试了最基本的代码,但无法确定它是否正在干预 Kafka 提供的自动功能。例如,自动提交偏移、超时等。

还是坚持 Kafka 流已经提供的默认行为并利用流线程快速处理数据总是更好?

不建议启动自己的线程,因为这会破坏 Kafka Streams 的容错保证。如果process()返回,Kafka Streams 假定消息已完全处理,并且所有潜在的输出消息都通过forward()发送。对于这种情况,Kafka 流可能会提交输入记录偏移量。

但是,如果在后台线程中处理消息,并且线程处理失败,则 Kafka Streams 将不知道任何相关信息,因此,即使发生故障并且消息将丢失,也可能提交偏移量。

此外,不允许后台线程在返回后调用process()forward()。如果forward()被称为process()Kafka Streams 的"外部",则会引发异常。

使用自己的后台线程并保留至少一次处理保证并非不可能,但是,它相当复杂,因此不建议使用。

Kafka 流使用 kafka consumer API 来消费来自 kafka 主题的消息。这意味着,即使您生成多个线程来使用消息,额外的线程也将保持空闲状态。

例如,如果你的主题有 5 个分区,即使生成 10 个线程从主题读取,kafka 消费者 API 也只会使用 5 个线程从主题读取,其余线程将处于空闲状态。

您可以在定义流配置时定义要生成的线程数。

Properties streamsConfig = new Properties();
streamsConfig.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1); //Here number of threads being spawned per kafka streams app node is 1 

因此,如果您的主题有 10 个分区,并且您的 kafka 流应用程序部署在两个节点上,则NUM_STREAM_THREADS_CONFIG将为 5。

如果您需要更多帮助,请告诉我!

相关内容

  • 没有找到相关文章

最新更新