Spring Cloud Kafka Stream:无法获取窗口聚合会话使用自定义 Serdes



我对使用kafka流和spring-cloud流相对陌生,在使用那里的窗口聚合功能时遇到了困难。

我想做的是

  1. 获取UserInteractionEvents的初始流,并按userProjectId(字符串)对其进行分组
  2. 创建一个15分钟不活动的这些事件的窗口会话
  3. 将这些窗口会话聚合到自定义会话对象中
  4. 然后将这些会话对象转换为另一个自定义UserSession对象

我的代码是这样的:

@EnableBinding(KafkaStreamsProcessor::class)
inner class SessionProcessorApplication {
@StreamListener("input")
@SendTo("output")
fun process(input: KStream<*, UserInteractionEvent>): KStream<*, UserSession> {
return input
.groupBy({ _, v -> v.userProjectId }, Serialized.with(Serdes.String(), UserInteractionEventSerde()))
.windowedBy(SessionWindows.with(TimeUnit.MINUTES.toMillis(15)))
.aggregate(
Initializer<Session>(::Session),
Aggregator<String, UserInteractionEvent, Session> { _, event, session ->  session.interactions + event.interaction; session  },
Merger<String, Session> { _, session1, session2 ->  Session.merge(session1, session2)},
Materialized.`as`<String, Session, SessionStore<Bytes, ByteArray>>("windowed-sessions")
.withKeySerde(Serdes.String()).withValueSerde(SessionSerde()))
.toStream()
.map { windowed, session ->
KeyValue(windowed.key(),
UserSession(windowed.key(),
session.interactions,
Instant.ofEpochSecond(windowed.window().start()),
Instant.ofEpochSecond(windowed.window().end()))
)
}
}
}

我似乎在它的聚合部分遇到了问题。在尝试刷新带窗口的会话存储时看到类强制转换异常。我很困惑如何从这里开始。如果有人能指出我错在哪里,或者一些关于使用带有自定义serdes的会话窗口的文档,我将不胜感激!

非常感谢!

下面的完整堆栈跟踪:

线程"default-dc0af3aa-8d8d-4b51-b0de-cdeb2dd83db6-StreamThread-1"org.apache.kafka.streams.errors.ProcessorStateException:task[1_0]无法刷新状态存储窗口会话网址:org.apache.kafka.streams.processer.internals.ProcessorStateManager.flash(ProcessorStateManager.java:245)网址:org.apache.kafka.streams.processer.internals.AbstractTask.flushState(AbstractTask.java:196)网址:org.apache.kafka.stream.processer.internals.StreamTask.flushState(StreamTask.java:327)网址:org.apache.kafka.stream.processer.internals.StreamTask$1.run(StreamTask.java:307)网址:org.apache.kafka.stream.processer.internals.StreamsMetricImpl.measureLatencyNs(StreamsMetricImpl.java:208)网址:org.apache.kafka.streams.processer.internals.StreamTask.commit(StreamTask.java:302)网址:org.apache.kafka.streams.processer.internals.StreamTask.commit(StreamTask.java:292)网址:org.apache.kafka.stream.processer.internals.AssignedTasks$2.apply(AssignedTasks.java:87)网址:org.apache.kafka.stream.processer.internals.AssignedTasks.applyToRunningTasks(AssignedTasks.java:452)网址:org.apache.kafka.stream.processer.internals.AssignedTasks.commit(AssignedTasks.java:381)网址:org.apache.kafka.streams.processer.internals.TaskManager.commitAll(TaskManager.java:310)网址:org.apache.kafka.streams.processer.internals.StreamThread.maybeCommit(StreamThread.java:1018)网址:org.apache.kafka.streams.processer.internals.StreamThread.runOnce(StreamThread.java:835)网址:org.apache.kafka.streams.processer.internals.StreamThread.runLoop(StreamThread.java:774)网址:org.apache.kafka.streams.processer.internals.StreamThread.run(StreamThread.java:744)由:org.apache.kafka.streams.errors.StreamsException引起:序列化程序(密钥:org.apache.cafka.commun.serialization.ByteArraySerializer/value:org.apache.kaf ka.commun.Serialize.ByteArraySerilizer)与实际的密钥或值类型(密钥类型:java.lang.String/value type:[B)不兼容。更改StreamConfig中的默认Serdes或通过方法参数提供正确的Serdes。网址:org.apache.kafka.streams.processer.internals.SinkNode.produce(SinkNode.java:91)网址:org.apache.kafka.streams.processer.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:85)网址:org.apache.kafka.stream.kstream.internals.KStreamMap$KStreamMappingProcessor.procedure(KStreamMap.java:42)网址:org.apache.kafka.streams.processer.internals.ProcessorNode$1.run(ProcessorNode.java:46)网址:org.apache.kafka.stream.processer.internals.StreamsMetricImpl.measureLatencyNs(StreamsMetricImpl.java:208)网址:org.apache.kafka.streams.processer.internals.ProcessorNode.process(ProcessorNode.java:124)网址:org.apache.kafka.streams.processer.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:85)网址:org.apache.kafka.stream.kstream.internals.KStreamMap$KStreamMappingProcessor.procedure(KStreamMap.java:42)网址:org.apache.kafka.streams.processer.internals.ProcessorNode$1.run(ProcessorNode.java:46)网址:org.apache.kafka.stream.processer.internals.StreamsMetricImpl.measureLatencyNs(StreamsMetricImpl.java:208)网址:org.apache.kafka.streams.processer.internals.ProcessorNode.process(ProcessorNode.java:124)网址:org.apache.kafka.streams.processer.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:85)网址:org.apache.kafka.stream.kstream.internals.KStreamMap$KStreamMappingProcessor.procedure(KStreamMap.java:42)网址:org.apache.kafka.streams.processer.internals.ProcessorNode$1.run(ProcessorNode.java:46)网址:org.apache.kafka.stream.processer.internals.StreamsMetricImpl.measureLatencyNs(StreamsMetricImpl.java:208)网址:org.apache.kafka.streams.processer.internals.ProcessorNode.process(ProcessorNode.java:124)网址:org.apache.kafka.streams.processer.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:85)网址:org.apache.kafka.stream.kstream.internals.KStreamMapValues$KStreamMappingProcessor.procle(KStreamMapValues.java:41)网址:org.apache.kafka.streams.processer.internals.ProcessorNode$1.run(ProcessorNode.java:46)网址:org.apache.kafka.stream.processer.internals.StreamsMetricImpl.measureLatencyNs(StreamsMetricImpl.java:208)网址:org.apache.kafka.streams.processer.internals.ProcessorNode.process(ProcessorNode.java:124)网址:org.apache.kafka.streams.processer.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:85)网址:org.apache.kafka.stream.kstream.internals.ForwardingCacheFlushListener.apply(ForwardingCacheFluidListener.java:42)网址:org.apache.kafka.streams.state.internals.CachingSessionStore.putAndMaybeForward(CachingSessionStore.java:176)网址:org.apache.kafka.streams.state.internals.CachingSessionStore.access$000(CachingSessionStore.java:38)网址:org.apache.kafka.streams.state.internals.CachingSessionStore$1.apply(CachingSessionStore.java:88)网址:org.apache.kafka.streams.state.internals.NamedCache.flash(NamedCache.java:141)网址:org.apache.kafka.streams.state.internals.NamedCache.flash(NamedCache.java:99)网址:org.apache.kafka.streams.state.internals.ThreadCache.flash(ThreadCache.java:127)网址:org.apache.kafka.streams.state.internals.CachingSessionStore.flash(CachingSessionStore.java:196)网址:org.apache.kafka.streams.state.internals.MeterdSessionStore.flush(MeteredSessionStore.java:165)网址:org.apache.kafka.streams.processer.internals.ProcessorStateManager.flash(ProcessorStateManager.java:242)…还有14个由以下原因引起:java.lang.ClassCastException:java.lang.String无法强制转换为[B网址:org.apache.kafka.common.serialization.ByteArraySerializer.serialize(ByteArraySerilizer.java:21)网址:org.apache.kafka.streams.processer.internals.RecollectorImpl.send(RecordCollectorImpl.java:90)网址:org.apache.kafka.streams.processer.internals.RecollectorImpl.send(RecordCollectorImpl.java:78)网址:org.apache.kafka.streams.processer.internals.SinkNode.proce(SinkNode.java:87)…还有45个

我的配置:

spring.cloud.stream.kafka.streams.bindings:
default.key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
default.value.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
input:
consumer:
valueSerde: com.teckro.analytics.UserInteractionEventSerde
output:
producer:
valueSerde: com.teckro.analytics.UserSessionSerde
spring.cloud.stream.bindings:
input:
destination: test-interaction
consumer:
headerMode: raw
output:
destination: test-session
producer:
headerMode: raw

我发现您的配置有一些问题。

默认Serde的配置方式应更改如下:

spring.cloud.stream.kafka.streams.binder.configuration:
default.key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
default.value.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
spring.cloud.stream.kafka.streams.bindings:
input:
consumer:
valueSerde: com.teckro.analytics.UserInteractionEventSerde
output:
producer:
valueSerde: com.teckro.analytics.UserSessionSerde

看来您正在使用本机Serde进行所有的反序列化。您希望将其包含在配置中。默认情况下,绑定器执行输入/输出序列化。

spring.cloud.stream.bindings:
input:
destination: test-interaction
consumer:
useNativeDecoding: true
output:
destination: test-session
producer:
useNativeEncoding: true

如果问题仍然存在,请在Github上创建一个简单的示例项目并与我们分享。我们将拭目以待。

最新更新