了解创建的流处理器实例数以及流任务是否共享相同的流处理器实例?



我想了解有关StreamThreadStreamTask之间的关系以及当我们有以下情况时创建多少个StreamProcessor实例的详细信息:

  • 具有多个分区的源卡夫卡主题,例如6。
  • 我只保留一个StreamThread(num.stream.threads=1(

我保留了一个简单的处理器拓扑:

source_topic -->处理器1 -->处理器2 -->进程o3 --> sink_topic

每个处理器只需转发到链中的下一个处理器。其中一个处理器的代码段。我正在使用低级Java API。

public class Processor1 implements Processor<String, String> {
private ProcessorContext context;
public Processor1() {

}
@Override
@SuppressWarnings("unchecked")
public void init(ProcessorContext context) {
this.context = context
}
@Override
public void punctuate(long timestamp) {
// TODO Auto-generated method stub
}
@Override
public void close() {
// TODO Auto-generated method stub
}
@Override
public void process(String key, String value) {
System.out.println("Inside Processor1#process() method");
context.forward(key, value);
}
}

主驱动程序应用程序片段:

Topology topology = new Topology();
topology.addSource("SOURCE", "source-topic-data");
topology.addProcessor("Processor1", () -> new Processor1(), "SOURCE");
topology.addProcessor("Processor2", () -> new Processor2(), "Processor1");
topology.addProcessor("Processor3", () -> new Processor3(), "Processor2");
topology.addSink("SINK", "sink-topic-data", "Processor3");
Properties settings = new Properties();
settings.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1);
StreamsConfig config = new StreamsConfig(settings);
KafkaStreams streams = new KafkaStreams(topology, config);
streams.start();

有了这个安排,我有以下问题:

  • 将创建多少个处理器实例(Processor1Processor2Processor3(?
  • 据我了解,会有SIX stream tasks。是为每个Stream task创建一个新的处理器实例,还是它们"共享"相同的Processor instance
  • 创建Stream Thread时,是否会创建processor的新实例?
  • Stream Tasks是作为Stream Threads创造的一部分而创建的吗?

(新问题已添加到原始列表中(

  • 在这种情况下,single stream thread将具有SIX stream tasks.stream thread是否逐个执行这些stream tasks,有点"循环"。stream tasks作为单独的"线程"运行吗?基本上,无法理解single stream thread如何同时/并行运行多个stream tasks

以下是打印的拓扑:


KafkaStreams processID: 1602fe25-57ab-4620-99df-fd0c15d96e42
StreamsThread appId: my-first-streams-application
StreamsThread clientId: my-first-streams-application-1602fe25-57ab-4620-99df-fd0c15d96e42
StreamsThread threadId: my-first-streams-application-1602fe25-57ab-4620-99df-fd0c15d96e42-StreamThread-1
Active tasks:
Running:                                StreamsTask taskId: 0_0
ProcessorTopology:
SOURCE:
topics:     [source-topic-data]
children:   [Processor1]
Processor1:
children:   [Processor2]
Processor2:
children:   [Processor3]
Processor3:
children:   [SINK]
SINK:
topic:      sink-topic-data
Partitions [source-topic-data-0]
StreamsTask taskId: 0_1
ProcessorTopology:
SOURCE:
topics:     [source-topic-data]
children:   [Processor1]
Processor1:
children:   [Processor2]
Processor2:
children:   [Processor3]
Processor3:
children:   [SINK]
SINK:
topic:      sink-topic-data
Partitions [source-topic-data-1]
StreamsTask taskId: 0_2
ProcessorTopology:
SOURCE:
topics:     [source-topic-data]
children:   [Processor1]
Processor1:
children:   [Processor2]
Processor2:
children:   [Processor3]
Processor3:
children:   [SINK]
SINK:
topic:      sink-topic-data
Partitions [source-topic-data-2]
StreamsTask taskId: 0_3
ProcessorTopology:
SOURCE:
topics:     [source-topic-data]
children:   [Processor1]
Processor1:
children:   [Processor2]
Processor2:
children:   [Processor3]
Processor3:
children:   [SINK]
SINK:
topic:      sink-topic-data
Partitions [source-topic-data-3]
StreamsTask taskId: 0_4
ProcessorTopology:
SOURCE:
topics:     [source-topic-data]
children:   [Processor1]
Processor1:
children:   [Processor2]
Processor2:
children:   [Processor3]
Processor3:
children:   [SINK]
SINK:
topic:      sink-topic-data
Partitions [source-topic-data-4]
StreamsTask taskId: 0_5
ProcessorTopology:
SOURCE:
topics:     [source-topic-data]
children:   [Processor1]
Processor1:
children:   [Processor2]
Processor2:
children:   [Processor3]
Processor3:
children:   [SINK]
SINK:
topic:      sink-topic-data
Partitions [source-topic-data-5]
Suspended:
Restoring:
New:
Standby tasks:
Running:
Suspended:
Restoring:
New:

将创建多少个处理器实例(处理器 1、处理器 2、处理器 3(?

在您的示例中,每个六个。每个任务将实例化Topology的完整副本。(参见 https://github.com/apache/kafka/blob/2.4/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java#L355;注意:Topology是程序的逻辑表示形式,在运行时实例化为ProcessorTopology(

根据我的理解,将有六个流任务。是为每个流任务创建新的处理器实例,还是它们"共享"同一个处理器实例?

每个任务都有自己的Processor实例 - 它们不共享。

创建流线程时,它是否会创建新的处理器实例?

不。创建任务时,它将创建新的Processor实例。

任务是否作为流线程创建的一部分创建?

不。任务是在重新平衡期间根据分区/任务分配创建的。KafkaStreams在其内部cosumner上注册了一个调用TaskManager#createTasks()StreamsRebalanceListener

更新(随着问题的扩展(:

在此方案中,单个流线程将具有六个流任务。流线程是否逐个执行这些流任务,有点"循环"。是否流任务作为单独的"线程"运行。基本上,无法理解单个流线程如何同时/并行运行多个流任务?

是的,StreamsThread将循环执行任务。没有其他线程。因此,分配给同一线程的任务不是同时/并行执行,而是一个接一个地执行。(参见 https://github.com/apache/kafka/blob/2.4/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasks.java#L472 - 每个StreamThread只使用一个内部使用AssignedStreamsTasksAssignedStandbyTasksTaskManager

最新更新