我正在尝试在 Kafka 流中创建一个 leftJoin,它适用于大约 10 条记录,然后它崩溃了,并且由具有此类代码的NullPointerException
引起的异常:
private static KafkaStreams getKafkaStreams() {
StreamsConfig streamsConfig = new StreamsConfig(getProperties());
KStreamBuilder builder = new KStreamBuilder();
KTable<String, Verkaeufer> umsatzTable = builder.table(Serdes.String(), EventstreamSerde.Verkaeufer(), CommonUtilsConstants.TOPIC_VERKAEUFER_STAMMDATEN);
KStream<String, String> verkaeuferStream = builder.stream(CommonUtilsConstants.TOPIC_ANZAHL_UMSATZ_PER_VERKAEUFER);
KStream<String, String> tuttiStream = verkaeuferStream.leftJoin(umsatzTable,
(tutti, verkaeufer) -> ("Vorname=" + verkaeufer.getVorname().toString() +",Nachname=" +verkaeufer.getNachname().toString() +"," +tutti.toString()), Serdes.String(), Serdes.String());
tuttiStream.to(Serdes.String(), Serdes.String(), CommonUtilsConstants.TOPIC_TUTTI);
return new KafkaStreams(builder, streamsConfig);
}
StreamsConfig
看起来像这样:
private static Properties getProperties() {
Properties props = new Properties();
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CommonUtilsConstants.BOOTSTRAP_SERVER_CONFIGURATION);
props.put(StreamsConfig.APPLICATION_ID_CONFIG, CommonUtilsConstants.GID_TUTTI);
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,Serdes.String().getClass());
props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, "1000");
return props;
}
全栈跟踪:
22:19:36.550 [gid-tutti-8fe6be58-d5c5-41ce-982d-88081b98004e-StreamThread-1] ERROR o.a.k.s.p.internals.StreamThread - stream-thread [gid-tutti-8fe6be58-d5c5-41ce-982d-88081b98004e-StreamThread-1] Failed to commit StreamTask 0_0 state: org.apache.kafka.streams.errors.ProcessorStateException: task [0_0] Failed to flush state store KTABLE-SOURCE-STATE-STORE-0000000000
at org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:262)
at org.apache.kafka.streams.processor.internals.AbstractTask.flushState(AbstractTask.java:190)
at org.apache.kafka.streams.processor.internals.StreamTask.flushState(StreamTask.java:282)
at org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamTask.java:264)
at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:187)
at org.apache.kafka.streams.processor.internals.StreamTask.commitImpl(StreamTask.java:259)
at org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:253)
at org.apache.kafka.streams.processor.internals.StreamThread.commitOne(StreamThread.java:815)
at org.apache.kafka.streams.processor.internals.StreamThread.access$2800(StreamThread.java:73)
at org.apache.kafka.streams.processor.internals.StreamThread$2.apply(StreamThread.java:797)
at org.apache.kafka.streams.processor.internals.StreamThread.performOnStreamTasks(StreamThread.java:1448)
at org.apache.kafka.streams.processor.internals.StreamThread.commitAll(StreamThread.java:789)
at org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:778)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:567)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:527) Caused by: java.lang.NullPointerException: null
at java.lang.String.<init>(String.java:143)
at ch.wesr.eventstream.commonutils.serde.GsonDeserializer.deserialize(GsonDeserializer.java:38)
at org.apache.kafka.streams.state.StateSerdes.valueFrom(StateSerdes.java:163)
at org.apache.kafka.streams.state.internals.CachingKeyValueStore.putAndMaybeForward(CachingKeyValueStore.java:90)
at org.apache.kafka.streams.state.internals.CachingKeyValueStore.access$000(CachingKeyValueStore.java:34)
at org.apache.kafka.streams.state.internals.CachingKeyValueStore$1.apply(CachingKeyValueStore.java:78)
at org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:145)
at org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:103)
at org.apache.kafka.streams.state.internals.ThreadCache.flush(ThreadCache.java:97)
at org.apache.kafka.streams.state.internals.CachingKeyValueStore.flush(CachingKeyValueStore.java:107)
at org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:260)
... 14 common frames omitted
更新:
这就是GsonDeserialize
的样子
public class GsonDeserializer<T> implements Deserializer<T>{
public static final String CONFIG_VALUE_CLASS = "default.value.deserializer.class";
public static final String CONFIG_KEY_CLASS = "default.key.deserializer.class";
private Class<T> deserializedClass;
private Gson gson = new GsonBuilder().create();
public GsonDeserializer() {}
@Override
public void configure(Map<String, ?> config, boolean isKey) {
String configKey = isKey ? CONFIG_KEY_CLASS : CONFIG_VALUE_CLASS;
String clsName = String.valueOf(config.get(configKey));
try {
if (deserializedClass == null) {
deserializedClass = (Class<T>) Class.forName(clsName);
}
} catch (ClassNotFoundException e) {
System.err.printf("Failed to configure GsonDeserializer. " +
"Did you forget to specify the '%s' property ?%n",
configKey);
System.out.println(e.getMessage());
}
}
@Override
public T deserialize(String s, byte[] bytes) {
return gson.fromJson(new String(bytes), deserializedClass);
}
@Override
public void close() {}
}
只要不刷新缓存,就永远不会调用反序列化程序。这就是为什么它一开始不会失败,您可以通过缓存大小参数和提交间隔(我们在提交时刷新)增加直到失败的时间。
查看您的代码以了解GsonDeserializer
,似乎 NPE new String(bytes)
失败 - String
构造函数不能将null
作为参数 - 您的反序列化器代码必须防止bytes==null
并且应该在这种情况下直接返回null
。