没有记录 on Kinesis 流暂停消息 1 秒



我正在使用弹簧 kinesis 粘合剂在批处理模式下使用消息

有时我无法使用流中的消息。这种情况并非一直都在发生

在 sequenceNumber [null] 上没有 [ShardConsumer{shardOffset=KinesisShardOffset{iteratorType=AFTER_SEQUENCE_NUMBER, sequenceNumber='49594358705006691330463332232285735104253344290967650306', timestamp=null, stream='mystream-1', shard='shardId-0000000000000', reset=false}, state=CONSUME}] 的记录。暂停消耗 [1000] 毫秒。

我不断收到此消息,并且消息未被消耗。

我的 Conf 如下所示

 spring:
  cloud:
    stream:
      bindings:
        input:
          group: mygroup
          destination: mystream-1
          content-type: application/json
        output:
          destination: mystream-2
          content-type: application/json
      kinesis:        
        bindings:
          input:
            consumer:
              listenerMode: batch
              idleBetweenPolls: 60000
              consumer-backoff: 1000
        binder:
          headers: x-item_id,x-message_type
          locks:
            table: lTLocks
            leaseDuration: 30
            refreshPeriod: 3000
          checkpoint:
            table: lTCheckPoints

流中有消息,但我没有间歇性地消费。你能帮忙吗?

我认为您在 Kinesis 流中的检查点存储和分片包含不同的序列号。

请考虑在开始从流中使用清理lTCheckPoints表。

我并不是说你需要一直这样做,但至少为了这个干净的测试环境。

最新更新