Kafka Streams EOS模式-通知关闭



我有一个Kafka Streams应用程序,它在没有任何正确日志记录的情况下关闭,即使是在调试级别-

2020-12-18 14:25:36:875 +0000 [Thread-7] INFO  o.apache.kafka.streams.KafkaStreams:? - stream-client [trinity-client-pandprat-estestes5-null-b9346744-6bb4-464d-aeaa-9311ab16ce2c] State transition from REBALANCING to PENDING_SHUTDOWN
2020-12-18 14:25:36:973 +0000 [kafka-streams-close-thread] INFO  o.a.k.s.p.internals.StreamThread:? - stream-thread [trinity-client-pandprat-estestes5-null-b9346744-6bb4-464d-aeaa-9311ab16ce2c-StreamThread-1] Informed to shut down
2020-12-18 14:25:36:974 +0000 [kafka-streams-close-thread] INFO  o.a.k.s.p.internals.StreamThread:? - stream-thread [trinity-client-pandprat-estestes5-null-b9346744-6bb4-464d-aeaa-9311ab16ce2c-StreamThread-1] State transition from STARTING to PENDING_SHUTDOWN
2020-12-18 14:25:36:974 +0000 [XXXXXX-estestes5-null-b9346744-6bb4-464d-aeaa-9311ab16ce2c-StreamThread-1] DEBUG org.apache.kafka.clients.Metadata:? - [Consumer clientId=XXXXXX-estestes5-null-b9346744-6bb4-464d-aeaa-9311ab16ce2c-StreamThread-1-consumer, groupId=XXXXXX-estestes5-null] Updating last seen epoch from 0 to 0 for partition input-event-stream-client-pandprat-estestes5-0
2020-12-18 14:25:37:075 +0000 [XXXXXX-estestes5-null-b9346744-6bb4-464d-aeaa-9311ab16ce2c-StreamThread-1] DEBUG org.apache.kafka.clients.Metadata:? - [Consumer clientId=XXXXXX-estestes5-null-b9346744-6bb4-464d-aeaa-9311ab16ce2c-StreamThread-1-consumer, groupId=XXXXXX-estestes5-null] Updated cluster metadata updateVersion 3 to MetadataCache{cluster=Cluster(id = ibD7yxLZQQSg24kQTlFnZA, nodes = [b-9.XXXXX.ap-southeast-1.amazonaws.com:9092 (id: 9 rack: apse1-az3), b-7.XXXX.ap-southeast-1.amazonaws.com:9092 (id: 7 rack: apse1-az1), b-8.XXXX.ap-southeast-1.amazonaws.com:9092 (id: 8 rack: apse1-az2), b-5.XXXX.ap-southeast-1.amazonaws.com:9092 (id: 5 rack: apse1-az3), b-4.XXXX.ap-southeast-1.amazonaws.com:9092 (id: 4 rack: apse1-az2), b-6.XXXX.ap-southeast-1.amazonaws.com:9092 (id: 6 rack: apse1-az1), b-1.XXXX.ap-southeast-1.amazonaws.com:9092 (id: 1 rack: apse1-az3), b-3.XXXX.ap-southeast-1.amazonaws.com:9092 (id: 3 rack: apse1-az2), b-2.XXXX.ap-southeast-1.amazonaws.com:9092 (id: 2 rack: apse1-az1)], partitions = [Partition(topic = input-event-stream-client-pandprat-estestes5, partition = 0, leader = 9, replicas = [9,2,3], isr = [9,2,3], offlineReplicas = [])], controller = b-3.XXXXXX-msk-temp.k1lph1.c2.kafka.ap-southeast-1.amazonaws.com:9092 (id: 3 rack: apse1-az2))}
2020-12-18 14:25:37:172 +0000 [XXXXXX-estestes5-null-b9346744-6bb4-464d-aeaa-9311ab16ce2c-StreamThread-1] INFO  o.a.k.c.c.i.ConsumerCoordinator:? - [Consumer clientId=XXXXXX-estestes5-null-b9346744-6bb4-464d-aeaa-9311ab16ce2c-StreamThread-1-consumer, groupId=XXXXXX-estestes5-null] Revoking previously assigned partitions []
2020-12-18 14:25:37:172 +0000 [kafka-coordinator-heartbeat-thread | XXXXXX-estestes5-null] DEBUG o.a.k.c.c.i.AbstractCoordinator:? - [Consumer clientId=XXXXXX-estestes5-null-b9346744-6bb4-464d-aeaa-9311ab16ce2c-StreamThread-1-consumer, groupId=XXXXXX-estestes5-null] Heartbeat thread started

卡夫卡版本-2.3.1Broker版本-2.2.1没有引发异常。也可以看到类似的场景,其中应用程序也从RUNNING移动到PENDING_SHUTDOWN。请参阅下面的日志-

Jan 7, 2021 @ 15:03:12.253  2021-01-07 09:33:12:252 +0000 [kafka-streams-close-thread] INFO  o.a.k.s.p.internals.StreamThread:? - stream-thread [XXXXXXX-tf-1test0107-null-60f8a4be-b576-4ed0-9615-b91021cd76e0-StreamThread-1] Informed to shut down
Jan 7, 2021 @ 15:03:12.253  2021-01-07 09:33:12:252 +0000 [kafka-streams-close-thread] INFO  o.a.k.s.p.internals.StreamThread:? - stream-thread [XXXXXXX-tf-1test0107-null-60f8a4be-b576-4ed0-9615-b91021cd76e0-StreamThread-1] State transition from RUNNING to PENDING_SHUTDOWN
Jan 7, 2021 @ 15:03:06.252  2021-01-07 09:33:06:252 +0000 [XXXXXXX-tf-1test0107-null-60f8a4be-b576-4ed0-9615-b91021cd76e0-StreamThread-1] INFO  o.a.k.s.p.internals.StreamThread:? - stream-thread [XXXXXXX-tf-1test0107-null-60f8a4be-b576-4ed0-9615-b91021cd76e0-StreamThread-1] State transition from PARTITIONS_ASSIGNED to RUNNING
Jan 7, 2021 @ 15:03:06.157  2021-01-07 09:33:06:157 +0000 [XXXXXXX-tf-1test0107-null-60f8a4be-b576-4ed0-9615-b91021cd76e0-StreamThread-1] INFO  c.a.t.e.EventSequenceProcessor:? - EventSequencer Processor getting initialized with bufferFlushInterval : 100, maxBufferSize : 10000, useExternalKnowledgeTime : true, forwardingLimit: 6000, forwardingIntervalInMillis: 6000
Jan 7, 2021 @ 15:03:06.155  2021-01-07 09:33:06:154 +0000 [XXXXXXX-tf-1test0107-null-60f8a4be-b576-4ed0-9615-b91021cd76e0-StreamThread-1] DEBUG o.a.k.s.p.i.AssignedStreamsTasks:? - stream-thread [XXXXXXX-tf-1test0107-null-60f8a4be-b576-4ed0-9615-b91021cd76e0-StreamThread-1] Transitioning stream task 0_0 to running
Jan 7, 2021 @ 15:03:05.183  2021-01-07 09:33:05:183 +0000 [XXXXXXX-tf-1test0107-null-60f8a4be-b576-4ed0-9615-b91021cd76e0-StreamThread-1] DEBUG o.a.k.s.p.i.AssignedStreamsTasks:? - stream-thread [XXXXXXX-tf-1test0107-null-60f8a4be-b576-4ed0-9615-b91021cd76e0-StreamThread-1] Transitioning stream task 0_0 to restoring
Jan 7, 2021 @ 15:03:05.180  2021-01-07 09:33:05:179 +0000 [XXXXXXX-tf-1test0107-null-60f8a4be-b576-4ed0-9615-b91021cd76e0-StreamThread-1] DEBUG o.a.k.s.p.i.ProcessorStateManager:? - task [0_0] Registering state store event-seq-state-store to its state manager
Jan 7, 2021 @ 15:03:05.168  2021-01-07 09:33:05:167 +0000 [XXXXXXX-tf-1test0107-null-60f8a4be-b576-4ed0-9615-b91021cd76e0-StreamThread-1] DEBUG o.a.k.s.p.i.AssignedStreamsTasks:? - stream-thread [XXXXXXX-tf-1test0107-null-60f8a4be-b576-4ed0-9615-b91021cd76e0-StreamThread-1] Initializing stream tasks [0_0]
Jan 7, 2021 @ 15:03:05.163  2021-01-07 09:33:05:162 +0000 [XXXXXXX-tf-1test0107-null-60f8a4be-b576-4ed0-9615-b91021cd76e0-StreamThread-1] INFO  o.a.k.s.p.internals.StreamThread:? - stream-thread [XXXXXXX-tf-1test0107-null-60f8a4be-b576-4ed0-9615-b91021cd76e0-StreamThread-1] partition assignment took 604 ms.
Jan 7, 2021 @ 15:03:05.163      current active tasks: [0_0]
Jan 7, 2021 @ 15:03:05.163      previous active tasks: []
Jan 7, 2021 @ 15:03:05.163      current standby tasks: []
Jan 7, 2021 @ 15:03:05.163  
Jan 7, 2021 @ 15:03:04.953  2021-01-07 09:33:04:952 +0000 [XXXXXXX-tf-1test0107-null-60f8a4be-b576-4ed0-9615-b91021cd76e0-StreamThread-1] DEBUG o.a.k.s.p.i.ProcessorStateManager:? - task [0_0] Register global stores []
Jan 7, 2021 @ 15:03:04.653  2021-01-07 09:33:04:652 +0000 [XXXXXXX-tf-1test0107-null-60f8a4be-b576-4ed0-9615-b91021cd76e0-StreamThread-1] DEBUG o.a.k.s.p.i.ProcessorStateManager:? - task [0_0] Created state store manager for task 0_0 with the acquired state dir lock
Jan 7, 2021 @ 15:03:04.653  2021-01-07 09:33:04:653 +0000 [XXXXXXX-tf-1test0107-null-60f8a4be-b576-4ed0-9615-b91021cd76e0-StreamThread-1] INFO  o.a.k.s.p.internals.StreamThread:? - stream-thread [XXXXXXX-tf-1test0107-null-60f8a4be-b576-4ed0-9615-b91021cd76e0-StreamThread-1] Creating producer client for task 0_0
Jan 7, 2021 @ 15:03:04.559  2021-01-07 09:33:04:558 +0000 [XXXXXXX-tf-1test0107-null-60f8a4be-b576-4ed0-9615-b91021cd76e0-StreamThread-1] DEBUG o.a.k.s.p.internals.StreamThread:? - stream-thread [XXXXXXX-tf-1test0107-null-60f8a4be-b576-4ed0-9615-b91021cd76e0-StreamThread-1] at state PARTITIONS_REVOKED: partitions [input-XXXXXXXX-tf-1test0107-0] assigned at the end of consumer rebalance.
Jan 7, 2021 @ 15:03:04.559      current suspended active tasks: []
Jan 7, 2021 @ 15:03:04.559      current suspended standby tasks: []
Jan 7, 2021 @ 15:03:04.559  2021-01-07 09:33:04:558 +0000 [XXXXXXX-tf-1test0107-null-60f8a4be-b576-4ed0-9615-b91021cd76e0-StreamThread-1] INFO  o.a.k.s.p.internals.StreamThread:? - stream-thread [XXXXXXX-tf-1test0107-null-60f8a4be-b576-4ed0-9615-b91021cd76e0-StreamThread-1] State transition from PARTITIONS_REVOKED to PARTITIONS_ASSIGNED
Jan 7, 2021 @ 15:03:04.559  2021-01-07 09:33:04:558 +0000 [XXXXXXX-tf-1test0107-null-60f8a4be-b576-4ed0-9615-b91021cd76e0-StreamThread-1] DEBUG o.a.k.s.p.internals.TaskManager:? - stream-thread [XXXXXXX-tf-1test0107-null-60f8a4be-b576-4ed0-9615-b91021cd76e0-StreamThread-1] Adding assigned tasks as active: {0_0=[input-XXXXXXXX-tf-1test0107-0]}
Jan 7, 2021 @ 15:03:04.559  
Jan 7, 2021 @ 15:03:04.559  2021-01-07 09:33:04:558 +0000 [XXXXXXX-tf-1test0107-null-60f8a4be-b576-4ed0-9615-b91021cd76e0-StreamThread-1] DEBUG o.a.k.s.p.internals.StreamThread:? - stream-thread [XXXXXXX-tf-1test0107-null-60f8a4be-b576-4ed0-9615-b91021cd76e0-StreamThread-1] Creating tasks based on assignment.
Jan 7, 2021 @ 15:03:04.386  2021-01-07 09:33:04:386 +0000 [XXXXXXX-tf-1test0107-null-60f8a4be-b576-4ed0-9615-b91021cd76e0-StreamThread-1] INFO  o.a.k.s.p.i.StreamsPartitionAssignor:? - stream-thread [XXXXXXX-tf-1test0107-null-60f8a4be-b576-4ed0-9615-b91021cd76e0-StreamThread-1-consumer] Assigned tasks to clients as {60f8a4be-b576-4ed0-9615-b91021cd76e0=[activeTasks: ([0_0]) standbyTasks: ([]) assignedTasks: ([0_0]) prevActiveTasks: ([]) prevStandbyTasks: ([]) prevAssignedTasks: ([]) capacity: 1]}.
Jan 7, 2021 @ 15:03:04.385  2021-01-07 09:33:04:385 +0000 [XXXXXXX-tf-1test0107-null-60f8a4be-b576-4ed0-9615-b91021cd76e0-StreamThread-1] DEBUG o.a.k.s.p.i.StreamsPartitionAssignor:? - stream-thread [XXXXXXX-tf-1test0107-null-60f8a4be-b576-4ed0-9615-b91021cd76e0-StreamThread-1-consumer] Assigning tasks [0_0] to clients {60f8a4be-b576-4ed0-9615-b91021cd76e0=[activeTasks: ([]) standbyTasks: ([]) assignedTasks: ([]) prevActiveTasks: ([]) prevStandbyTasks: ([]) prevAssignedTasks: ([]) capacity: 1]} with number of replicas 0
Jan 7, 2021 @ 15:03:04.385  2021-01-07 09:33:04:384 +0000 [XXXXXXX-tf-1test0107-null-60f8a4be-b576-4ed0-9615-b91021cd76e0-StreamThread-1] DEBUG o.a.k.s.p.i.StreamsPartitionAssignor:? - stream-thread [XXXXXXX-tf-1test0107-null-60f8a4be-b576-4ed0-9615-b91021cd76e0-StreamThread-1-consumer] Completed validating internal topics {XXXXXXX-tf-1test0107-null-event-seq-state-store-changelog=InternalTopicMetadata(config=UnwindowedChangelogTopicConfig(name=XXXXXXX-tf-1test0107-null-event-seq-state-store-changelog, topicConfigs={}), numPartitions=1)} in partition assignor.
Jan 7, 2021 @ 15:03:04.385  2021-01-07 09:33:04:384 +0000 [XXXXXXX-tf-1test0107-null-60f8a4be-b576-4ed0-9615-b91021cd76e0-StreamThread-1] DEBUG o.a.k.s.p.i.StreamsPartitionAssignor:? - stream-thread [XXXXXXX-tf-1test0107-null-60f8a4be-b576-4ed0-9615-b91021cd76e0-StreamThread-1-consumer] Created state changelog topics [InternalTopicMetadata(config=UnwindowedChangelogTopicConfig(name=XXXXXXX-tf-1test0107-null-event-seq-state-store-changelog, topicConfigs={}), numPartitions=1)] from the parsed topology.
Jan 7, 2021 @ 15:03:04.058  2021-01-07 09:33:04:058 +0000 [XXXXXXX-tf-1test0107-null-60f8a4be-b576-4ed0-9615-b91021cd76e0-StreamThread-1] DEBUG o.a.k.s.p.i.InternalTopicManager:? - stream-thread [main] Topic XXXXXXX-tf-1test0107-null-event-seq-state-store-changelog is unknown or not found, hence not existed yet.
Jan 7, 2021 @ 15:03:04.058  2021-01-07 09:33:04:058 +0000 [XXXXXXX-tf-1test0107-null-60f8a4be-b576-4ed0-9615-b91021cd76e0-StreamThread-1] DEBUG o.a.k.s.p.i.InternalTopicManager:? - stream-thread [main] Going to create topic XXXXXXX-tf-1test0107-null-event-seq-state-store-changelog with 1 partitions and config {cleanup.policy=compact}.
Jan 7, 2021 @ 15:03:04.049  2021-01-07 09:33:03:955 +0000 [XXXXXXX-tf-1test0107-null-60f8a4be-b576-4ed0-9615-b91021cd76e0-StreamThread-1] DEBUG o.a.k.s.p.i.StreamsPartitionAssignor:? - stream-thread [XXXXXXX-tf-1test0107-null-60f8a4be-b576-4ed0-9615-b91021cd76e0-StreamThread-1-consumer] Starting to validate internal topics {XXXXXXX-tf-1test0107-null-event-seq-state-store-changelog=InternalTopicMetadata(config=UnwindowedChangelogTopicConfig(name=XXXXXXX-tf-1test0107-null-event-seq-state-store-changelog, topicConfigs={}), numPartitions=1)} in partition assignor.
Jan 7, 2021 @ 15:03:04.049  2021-01-07 09:33:03:955 +0000 [XXXXXXX-tf-1test0107-null-60f8a4be-b576-4ed0-9615-b91021cd76e0-StreamThread-1] DEBUG o.a.k.s.p.i.InternalTopicManager:? - stream-thread [main] Trying to check if topics [XXXXXXX-tf-1test0107-null-event-seq-state-store-changelog] have been created with expected number of partitions.
Jan 7, 2021 @ 15:03:03.955  2021-01-07 09:33:03:954 +0000 [XXXXXXX-tf-1test0107-null-60f8a4be-b576-4ed0-9615-b91021cd76e0-StreamThread-1] DEBUG o.a.k.s.p.i.StreamsPartitionAssignor:? - stream-thread [XXXXXXX-tf-1test0107-null-60f8a4be-b576-4ed0-9615-b91021cd76e0-StreamThread-1-consumer] Starting to validate internal topics {} in partition assignor.
Jan 7, 2021 @ 15:03:03.955  2021-01-07 09:33:03:954 +0000 [XXXXXXX-tf-1test0107-null-60f8a4be-b576-4ed0-9615-b91021cd76e0-StreamThread-1] DEBUG o.a.k.s.p.i.StreamsPartitionAssignor:? - stream-thread [XXXXXXX-tf-1test0107-null-60f8a4be-b576-4ed0-9615-b91021cd76e0-StreamThread-1-consumer] Completed validating internal topics {} in partition assignor.
Jan 7, 2021 @ 15:03:03.955  2021-01-07 09:33:03:955 +0000 [XXXXXXX-tf-1test0107-null-60f8a4be-b576-4ed0-9615-b91021cd76e0-StreamThread-1] DEBUG o.a.k.s.p.i.StreamsPartitionAssignor:? - stream-thread [XXXXXXX-tf-1test0107-null-60f8a4be-b576-4ed0-9615-b91021cd76e0-StreamThread-1-consumer] Created repartition topics [] from the parsed topology.
Jan 7, 2021 @ 15:03:03.953  2021-01-07 09:33:03:953 +0000 [XXXXXXX-tf-1test0107-null-60f8a4be-b576-4ed0-9615-b91021cd76e0-StreamThread-1] DEBUG o.a.k.s.p.i.StreamsPartitionAssignor:? - stream-thread [XXXXXXX-tf-1test0107-null-60f8a4be-b576-4ed0-9615-b91021cd76e0-StreamThread-1-consumer] Constructed client metadata {60f8a4be-b576-4ed0-9615-b91021cd76e0=ClientMetadata{hostInfo=null, consumers=[XXXXXXX-tf-1test0107-null-60f8a4be-b576-4ed0-9615-b91021cd76e0-StreamThread-1-consumer-e7ce11d1-7b5a-478f-8aaa-1e46df67bbf3], state=[activeTasks: ([]) standbyTasks: ([]) assignedTasks: ([]) prevActiveTasks: ([]) prevStandbyTasks: ([]) prevAssignedTasks: ([]) capacity: 1]}} from the member subscriptions.
Jan 7, 2021 @ 15:03:03.952  2021-01-07 09:33:03:952 +0000 [XXXXXXX-tf-1test0107-null-60f8a4be-b576-4ed0-9615-b91021cd76e0-StreamThread-1] DEBUG o.a.k.s.p.i.StreamsPartitionAssignor:? - stream-thread [XXXXXXX-tf-1test0107-null-60f8a4be-b576-4ed0-9615-b91021cd76e0-StreamThread-1-consumer] Current minimum supported version remains at 4, last seen supported version was 4
Jan 7, 2021 @ 15:03:00.355  2021-01-07 09:33:00:354 +0000 [XXXXXXX-tf-1test0107-null-60f8a4be-b576-4ed0-9615-b91021cd76e0-StreamThread-1] DEBUG o.a.k.s.p.i.InternalTopologyBuilder:? - stream-thread [XXXXXXX-tf-1test0107-null-60f8a4be-b576-4ed0-9615-b91021cd76e0-StreamThread-1] found [input-XXXXXXXX-tf-1test0107] topics possibly matching regex
Jan 7, 2021 @ 15:03:00.355  2021-01-07 09:33:00:354 +0000 [XXXXXXX-tf-1test0107-null-60f8a4be-b576-4ed0-9615-b91021cd76e0-StreamThread-1] DEBUG o.a.k.s.p.i.InternalTopologyBuilder:? - stream-thread [XXXXXXX-tf-1test0107-null-60f8a4be-b576-4ed0-9615-b91021cd76e0-StreamThread-1] updating builder with SubscriptionUpdates{updatedTopicSubscriptions=[input-XXXXXXXX-tf-1test0107]} topic(s) with possible matching regex subscription(s)
Jan 7, 2021 @ 15:03:00.353      current assigned active tasks: []
Jan 7, 2021 @ 15:03:00.353  2021-01-07 09:33:00:352 +0000 [XXXXXXX-tf-1test0107-null-60f8a4be-b576-4ed0-9615-b91021cd76e0-StreamThread-1] INFO  o.a.k.s.p.internals.StreamThread:? - stream-thread [XXXXXXX-tf-1test0107-null-60f8a4be-b576-4ed0-9615-b91021cd76e0-StreamThread-1] State transition from STARTING to PARTITIONS_REVOKED
Jan 7, 2021 @ 15:03:00.353  2021-01-07 09:33:00:352 +0000 [XXXXXXX-tf-1test0107-null-60f8a4be-b576-4ed0-9615-b91021cd76e0-StreamThread-1] DEBUG o.a.k.s.p.internals.TaskManager:? - stream-thread [XXXXXXX-tf-1test0107-null-60f8a4be-b576-4ed0-9615-b91021cd76e0-StreamThread-1] Suspending all active tasks [] and standby tasks []
Jan 7, 2021 @ 15:03:00.353  2021-01-07 09:33:00:352 +0000 [XXXXXXX-tf-1test0107-null-60f8a4be-b576-4ed0-9615-b91021cd76e0-StreamThread-1] DEBUG o.a.k.s.p.internals.StreamThread:? - stream-thread [XXXXXXX-tf-1test0107-null-60f8a4be-b576-4ed0-9615-b91021cd76e0-StreamThread-1] at state STARTING: partitions [] revoked at the beginning of consumer rebalance.
Jan 7, 2021 @ 15:03:00.353      suspended standby tasks: []
Jan 7, 2021 @ 15:03:00.353      current assigned standby tasks: []
Jan 7, 2021 @ 15:03:00.353  
Jan 7, 2021 @ 15:03:00.353  2021-01-07 09:33:00:353 +0000 [XXXXXXX-tf-1test0107-null-60f8a4be-b576-4ed0-9615-b91021cd76e0-StreamThread-1] INFO  o.a.k.s.p.internals.StreamThread:? - stream-thread [XXXXXXX-tf-1test0107-null-60f8a4be-b576-4ed0-9615-b91021cd76e0-StreamThread-1] partition revocation took 1 ms.
Jan 7, 2021 @ 15:03:00.353      suspended active tasks: []
Jan 7, 2021 @ 15:02:58.752  Event Sequencer Server started, listening on 2301
Jan 7, 2021 @ 15:02:57.151  2021-01-07 09:32:57:150 +0000 [XXXXXXX-tf-1test0107-null-60f8a4be-b576-4ed0-9615-b91021cd76e0-StreamThread-1] INFO  o.a.k.s.p.internals.StreamThread:? - stream-thread [XXXXXXX-tf-1test0107-null-60f8a4be-b576-4ed0-9615-b91021cd76e0-StreamThread-1] State transition from CREATED to STARTING
Jan 7, 2021 @ 15:02:57.151  2021-01-07 09:32:57:150 +0000 [XXXXXXX-tf-1test0107-null-60f8a4be-b576-4ed0-9615-b91021cd76e0-StreamThread-1] INFO  o.a.k.s.p.internals.StreamThread:? - stream-thread [XXXXXXX-tf-1test0107-null-60f8a4be-b576-4ed0-9615-b91021cd76e0-StreamThread-1] Starting
Jan 7, 2021 @ 15:02:57.151  2021-01-07 09:32:57:151 +0000 [main] INFO  c.a.t.c.IngestionTopicConsumer:? - StreamThread Metadata : ThreadMetadata{threadName=XXXXXXX-tf-1test0107-null-60f8a4be-b576-4ed0-9615-b91021cd76e0-StreamThread-1, threadState=STARTING, activeTasks=[], standbyTasks=[], consumerClientId=XXXXXXX-tf-1test0107-null-60f8a4be-b576-4ed0-9615-b91021cd76e0-StreamThread-1-consumer, restoreConsumerClientId=XXXXXXX-tf-1test0107-null-60f8a4be-b576-4ed0-9615-b91021cd76e0-StreamThread-1-restore-consumer, producerClientIds=[], adminClientId=XXXXXXX-tf-1test0107-null-60f8a4be-b576-4ed0-9615-b91021cd76e0-admin} 
Jan 7, 2021 @ 15:02:57.051  2021-01-07 09:32:57:050 +0000 [main] WARN  o.a.k.c.consumer.ConsumerConfig:? - The configuration 'admin.retry.backoff.ms' was supplied but isn't a known config.
Jan 7, 2021 @ 15:02:57.051  2021-01-07 09:32:57:050 +0000 [main] WARN  o.a.k.c.consumer.ConsumerConfig:? - The configuration 'admin.retries' was supplied but isn't a known config.
Jan 7, 2021 @ 15:02:56.760  2021-01-07 09:32:56:760 +0000 [main] DEBUG o.a.k.s.p.i.InternalTopicManager:? - stream-thread [main] Configs:
Jan 7, 2021 @ 15:02:56.760  
Jan 7, 2021 @ 15:02:56.753  2021-01-07 09:32:56:752 +0000 [main] INFO  o.a.k.s.p.internals.StreamThread:? - stream-thread [XXXXXXX-tf-1test0107-null-60f8a4be-b576-4ed0-9615-b91021cd76e0-StreamThread-1] Creating consumer client
Jan 7, 2021 @ 15:02:55.857  2021-01-07 09:32:55:856 +0000 [main] INFO  o.a.k.s.p.internals.StreamThread:? - stream-thread [XXXXXXX-tf-1test0107-null-60f8a4be-b576-4ed0-9615-b91021cd76e0-StreamThread-1] Creating restore consumer client
Jan 7, 2021 @ 15:02:55.350  2021-01-07 09:32:55:349 +0000 [main] INFO  c.a.t.c.IngestionTopicConsumer:? - Initializing Ingestion Topic Consumer
Jan 7, 2021 @ 15:02:55.350  2021-01-07 09:32:55:349 +0000 [main] INFO  com.arcesium.trinity.EventSequencer:? - Initializing Ingestion Topic Consumer
Jan 7, 2021 @ 15:02:53.551      Source: input-topic (topics: [input-XXXXXXXX-tf-1test0107])
Jan 7, 2021 @ 15:02:53.551      Processor: event-sequencer (stores: [event-seq-state-store])
Jan 7, 2021 @ 15:02:53.551      Sink: output-event-topic (topic: output-XXXXXXXX-tf-1test0107)
Jan 7, 2021 @ 15:02:53.551        <-- event-sequencer
Jan 7, 2021 @ 15:02:53.551  
Jan 7, 2021 @ 15:02:53.551        <-- input-topic
Jan 7, 2021 @ 15:02:53.551  
Jan 7, 2021 @ 15:02:53.551  2021-01-07 09:32:53:460 +0000 [main] INFO  c.a.t.config.EventSequencerConfig:? - Topology initialized: Topologies:
Jan 7, 2021 @ 15:02:53.551     Sub-topology: 0
Jan 7, 2021 @ 15:02:53.551        --> event-sequencer
Jan 7, 2021 @ 15:02:53.551        --> output-event-topic
Jan 7, 2021 @ 15:02:30.854  Listening for transport dt_socket at address: 2311

我还看到,每当触发再平衡时,就会出现关闭。请参阅下面的日志-

Jan 7, 2021 @ 08:43:13.555  2021-01-07 03:13:13:554 +0000 [kafka-streams-close-thread] INFO  o.a.k.s.p.internals.StreamThread:? - stream-thread [XXXXXXX-tzobnwpj-null-adce3dfa-92cb-4f33-a806-badadffaadc7-StreamThread-1] Informed to shut down
Jan 7, 2021 @ 08:43:13.555  2021-01-07 03:13:13:554 +0000 [kafka-streams-close-thread] INFO  o.a.k.s.p.internals.StreamThread:? - stream-thread [XXXXXXX-tzobnwpj-null-adce3dfa-92cb-4f33-a806-badadffaadc7-StreamThread-1] State transition from PARTITIONS_REVOKED to PENDING_SHUTDOWN
Jan 7, 2021 @ 08:42:50.009  2021-01-07 03:12:50:009 +0000 [XXXXXXX-tzobnwpj-null-adce3dfa-92cb-4f33-a806-badadffaadc7-StreamThread-1] DEBUG o.a.k.s.p.internals.StreamTask:? - task [0_0] Committing
Jan 7, 2021 @ 08:42:50.009  2021-01-07 03:12:50:009 +0000 [XXXXXXX-tzobnwpj-null-adce3dfa-92cb-4f33-a806-badadffaadc7-StreamThread-1] DEBUG o.a.k.s.p.i.RecordCollectorImpl:? - task [0_0] Flushing producer
Jan 7, 2021 @ 08:42:50.009  2021-01-07 03:12:50:009 +0000 [XXXXXXX-tzobnwpj-null-adce3dfa-92cb-4f33-a806-badadffaadc7-StreamThread-1] DEBUG o.a.k.s.p.i.ProcessorStateManager:? - task [0_0] Flushing all stores registered in the state manager
Jan 7, 2021 @ 08:42:50.008      current assigned active tasks: [0_0]
Jan 7, 2021 @ 08:42:50.008  2021-01-07 03:12:50:008 +0000 [XXXXXXX-tzobnwpj-null-adce3dfa-92cb-4f33-a806-badadffaadc7-StreamThread-1] DEBUG o.a.k.s.p.internals.TaskManager:? - stream-thread [XXXXXXX-tzobnwpj-null-adce3dfa-92cb-4f33-a806-badadffaadc7-StreamThread-1] Suspending all active tasks [0_0] and standby tasks []
Jan 7, 2021 @ 08:42:50.008  2021-01-07 03:12:50:008 +0000 [XXXXXXX-tzobnwpj-null-adce3dfa-92cb-4f33-a806-badadffaadc7-StreamThread-1] DEBUG o.a.k.s.p.internals.StreamTask:? - task [0_0] Suspending
Jan 7, 2021 @ 08:42:50.008      current assigned standby tasks: []
Jan 7, 2021 @ 08:42:50.008  2021-01-07 03:12:50:008 +0000 [XXXXXXX-tzobnwpj-null-adce3dfa-92cb-4f33-a806-badadffaadc7-StreamThread-1] INFO  o.a.k.s.p.internals.StreamThread:? - stream-thread [XXXXXXX-tzobnwpj-null-adce3dfa-92cb-4f33-a806-badadffaadc7-StreamThread-1] State transition from RUNNING to PARTITIONS_REVOKED
Jan 7, 2021 @ 08:42:50.008  2021-01-07 03:12:50:008 +0000 [XXXXXXX-tzobnwpj-null-adce3dfa-92cb-4f33-a806-badadffaadc7-StreamThread-1] DEBUG o.a.k.s.p.internals.StreamThread:? - stream-thread [XXXXXXX-tzobnwpj-null-adce3dfa-92cb-4f33-a806-badadffaadc7-StreamThread-1] at state RUNNING: partitions [input-XXXXXXX-tzobnwpj-0] revoked at the beginning of consumer rebalance.
Jan 7, 2021 @ 08:42:50.008  
Jan 7, 2021 @ 08:42:49.842  2021-01-07 03:12:49:842 +0000 [XXXXXXX-tzobnwpj-null-adce3dfa-92cb-4f33-a806-badadffaadc7-StreamThread-1] WARN  o.a.k.c.consumer.internals.Fetcher:? - [Consumer clientId=XXXXXXX-tzobnwpj-null-adce3dfa-92cb-4f33-a806-badadffaadc7-StreamThread-1-consumer, groupId=XXXXXXX-tzobnwpj-null] Received unknown topic or partition error in fetch for partition input-XXXXXXX-tzobnwpj-0
Jan 7, 2021 @ 08:42:49.652  2021-01-07 03:12:49:652 +0000 [XXXXXXX-tzobnwpj-null-adce3dfa-92cb-4f33-a806-badadffaadc7-StreamThread-1] WARN  o.a.k.c.consumer.internals.Fetcher:? - [Consumer clientId=XXXXXXX-tzobnwpj-null-adce3dfa-92cb-4f33-a806-badadffaadc7-StreamThread-1-consumer, groupId=XXXXXXX-tzobnwpj-null] Received unknown topic or partition error in fetch for partition input-XXXXXXX-tzobnwpj-0
Jan 7, 2021 @ 08:42:49.651  2021-01-07 03:12:49:650 +0000 [XXXXXXX-tzobnwpj-null-adce3dfa-92cb-4f33-a806-badadffaadc7-StreamThread-1] WARN  o.a.k.c.consumer.internals.Fetcher:? - [Consumer clientId=XXXXXXX-tzobnwpj-null-adce3dfa-92cb-4f33-a806-badadffaadc7-StreamThread-1-consumer, groupId=XXXXXXX-tzobnwpj-null] Received unknown topic or partition error in fetch for partition input-XXXXXXX-tzobnwpj-0
Jan 7, 2021 @ 08:42:49.649  2021-01-07 03:12:49:649 +0000 [XXXXXXX-tzobnwpj-null-adce3dfa-92cb-4f33-a806-badadffaadc7-StreamThread-1] WARN  o.a.k.c.consumer.internals.Fetcher:? - [Consumer clientId=XXXXXXX-tzobnwpj-null-adce3dfa-92cb-4f33-a806-badadffaadc7-StreamThread-1-consumer, groupId=XXXXXXX-tzobnwpj-null] Received unknown topic or partition error in fetch for partition input-XXXXXXX-tzobnwpj-0

有人知道为什么会发生这种事吗?

日志行"Informed to shut down"表明调用了StreamThreadshutdown方法。这只能从两个地方调用:-

一个-KafkaStreamclose方法-用于实际完全关闭Kafka流(最终关闭所有StreamThreads(但是您的调试日志并没有指示完整的Kafka流正在关闭。如果是这种情况,你会在下面的日志

log.debug("Stopping Streams client with timeoutMillis = {} ms.", timeoutMs);

两个-RebalanceListener-onPartitionsAssigned方法

if (streamThread.assignmentErrorCode.get() == StreamsPartitionAssignor.Error.INCOMPLETE_SOURCE_TOPIC_METADATA.code()) {
log.error("Received error code {} - shutdown", streamThread.assignmentErrorCode.get());
streamThread.shutdown();
return;
} 

这可能意味着由于INCOMPLETE_SOURCE_TOPIC_METADATA,您的StreamThread正在接收关闭请求。这可能是一个暂时的问题,也可能是由于元数据不完整(如不存在或主题名称拼写错误等(而导致的永久性故障。

相关内容

  • 没有找到相关文章

最新更新