如果我的 kafka 流应用程序中有一个共享变量,并且该变量由处理代码中的多个线程更新,如何处理?我是否必须使该共享变量线程安全,或者这是 Kafka 流库如何处理的?在文档中的某个地方,我读到在运行 Kafka 流应用程序时不需要在线程之间进行协调。例如,下面是一个伪代码:
KStream<byte[], byte[]> input = ...;
int counter = 0;
KStream<byte[], byte[]>[] processed = input.map(
(k, v) -> {
....
....
//update counter by multiple threads.
);
如果此代码由来自同一应用实例的多个流任务执行,会发生什么情况?变量"已处理"怎么样,因为它也可以由多个线程更新?这需要在正常的 Java 场景中进行某种同步。我很好奇这是否由 Kafka 流库处理。
谢谢!
这取决于您配置了多少线程来执行任务。如果有一个线程执行所有任务,则不必使该共享变量线程安全。但是,如果您有多个线程,则需要使其线程安全,因为应用程序实例内的任务将分布在多个线程中。您的Kafka Streams应用程序只是一个正在运行的JVM,您从main()
开始。Kafka Streams 框架根据您指定的线程数编排处理。但它只是一个常规的 Java 运行时,并发访问仍然是并发访问。
有关线程和任务的更多信息,请参阅此处:Kafka Streams 线程编号
有关线程和任务以及共享状态的更多信息:Kafka 流处理器线程安全?
显然,一般来说,您在代码示例中显示的模式是您可能希望避免的模式,除非它实际上只是在本地计算应用程序的某些内容。在运行多个应用程序实例的生产应用程序中,如果应用程序实例上升或关闭,任务将重新分配,因此共享变量可能没有用处。这就是 Kafka Streams 存储机制如此有用的原因:您的状态随着任务而移动。