我目前在kafka流上执行我的第一步,我很难理解kafka应用程序如何存储其状态。我想打印kstream的内容而不需要更新偏移量,感觉这不是我应该做的事情,但我正在努力理解为什么:
def rawPlanningStream(
builder: StreamsBuilder,
topicName: String
): KStream[String, Planning] =
builder.stream(topicName)(Consumed.`with`(Serdes.String, Planning.serde))
def printPlanning(
key: String,
value: Planning
) = {
val logger = LoggerFactory.getLogger("PlanningEventSyncLogger")
logger.warn(s"Planning: $key, $value")
}
def process(
builder: StreamsBuilder,
rawTopic: String
) = {
val raw_planning_stream = PlanningEventSync.rawPlanningStream(
builder,
rawTopic
)
raw_planning_stream.peek((k,v) => printPlanning(k,v))
//Here I would like to perform an operation on raw_planning_stream
//but offset is already "wrong" because of the peek done earlier
}
第一次开始处理时,主题的内容按预期打印,如果我再次启动它,它不再打印任何内容,因为偏移量已更新。
我的问题是,是否有可能执行"非侵入性"操作,如打印,以使偏移量保持原样?
(注意:我设法使用——reset-offset -to-earliest from kafka-consumer-groups.sh在我的组上手动重置偏移量,但我希望能够以编程方式执行操作而不改变我的消费者组的偏移量)
如果你不能设置enable.auto.commit=false
,那么另一个选择是设置application.id="<some random UUID>"
,这样每次运行应用程序时,它都会创建一个新的消费者组,从auto.offset.reset
设置