我们正在开发一个使用低级处理器API的Kafka Streams应用程序。
根据 Kafka 上的文档,所有线程和并行性都由流线程和流任务处理。并行性也可以使用主题上的分区进行扩展。
当前代码如下所示:
public class Processor implements Processor<K, V> {
@Override
public void process(String key, V value) {
//Do processing on the stream thread itself
...
// Write back to output topic
context.forward(key, updatedValue)
});
}
}
但是,是否建议在任何情况下创建我们自己的线程来进行实际处理?这意味着使用 Kafka Streams API 主要用于使用主题中的数据,而不是用于实际处理。实际处理将在 Kafka 流线程中的初始数据消耗之后调用的新线程中进行。
拓扑中的示例处理器:
public class Processor implements Processor<K, V> {
@Override
public void process(String key, V value) {
//Spawn new thread to do the processing
ExecutorService executor = Executors.newSingleThreadExecutor();
executor.submit(() -> {
String threadName = Thread.currentThread().getName();
System.out.println("Hello " + threadName);
//Do more processing
...
// Write back to output topic
context.forward(key, updatedValue)
});
}
}
我已经为此尝试了最基本的代码,但无法确定它是否正在干预 Kafka 提供的自动功能。例如,自动提交偏移、超时等。
还是坚持 Kafka 流已经提供的默认行为并利用流线程快速处理数据总是更好?
不建议启动自己的线程,因为这会破坏 Kafka Streams 的容错保证。如果process()
返回,Kafka Streams 假定消息已完全处理,并且所有潜在的输出消息都通过forward()
发送。对于这种情况,Kafka 流可能会提交输入记录偏移量。
但是,如果在后台线程中处理消息,并且线程处理失败,则 Kafka Streams 将不知道任何相关信息,因此,即使发生故障并且消息将丢失,也可能提交偏移量。
此外,不允许后台线程在返回后调用process()
forward()
。如果forward()
被称为process()
Kafka Streams 的"外部",则会引发异常。
使用自己的后台线程并保留至少一次处理保证并非不可能,但是,它相当复杂,因此不建议使用。
Kafka 流使用 kafka consumer API 来消费来自 kafka 主题的消息。这意味着,即使您生成多个线程来使用消息,额外的线程也将保持空闲状态。
例如,如果你的主题有 5 个分区,即使生成 10 个线程从主题读取,kafka 消费者 API 也只会使用 5 个线程从主题读取,其余线程将处于空闲状态。
您可以在定义流配置时定义要生成的线程数。
Properties streamsConfig = new Properties();
streamsConfig.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1); //Here number of threads being spawned per kafka streams app node is 1
因此,如果您的主题有 10 个分区,并且您的 kafka 流应用程序部署在两个节点上,则NUM_STREAM_THREADS_CONFIG将为 5。
如果您需要更多帮助,请告诉我!