stateStore.get()
在KStream
上从transform()
使用时返回不一致的结果。它返回null,即使相应的键值已被put()
放入存储区。
有人能解释KeyValueStore<>
的这种行为吗?
@Component
public class StreamProcessor {
@StreamListener
public void process(@Input(KStreamBindings.INPUT_STREAM) KStream<String, JsonNode> inputStream) {
KStream<String, JsonNode> joinedEvents = inputStream
.selectKey((key, value) -> computeKey(value))
.transform(
() -> new SelfJoinTransformer((v1, v2) -> join(v1, v2), "join_store"),
"join_store"
);
joinedEvents
.foreach((key, value) -> System.out.format("%s,joined=%bn",key, value.has("right")));
}
private JsonNode join(JsonNode left, JsonNode right) {
((ObjectNode) left).set("right", right);
return left;
}
}
public class SelfJoinTransformer implements Transformer<String, JsonNode, KeyValue<String, JsonNode>> {
private KeyValueStore<String, JsonNode> stateStore;
private ValueJoiner<JsonNode, JsonNode, JsonNode> valueJoiner;
private String storeName;
public SelfJoinTransformer(ValueJoiner<JsonNode, JsonNode, JsonNode> valueJoiner, String storeName) {
this.storeName = storeName;
this.valueJoiner = valueJoiner;
}
@Override
public void init(ProcessorContext context) {
this.stateStore = (KeyValueStore<String, JsonNode>) context.getStateStore(storeName);
}
@Override
public KeyValue<String, JsonNode> transform(String key, JsonNode value) {
JsonNode oldValue = stateStore.get(key);
if (oldValue != null) { //this condition rarely holds true
stateStore.delete(key);
System.out.format("%s,joinedn", key);
return KeyValue.pair(key, valueJoiner.apply(oldValue, value));
}
stateStore.put(key, value);
return null;
}
}
消息似乎正在消失(假设标点符号没有删除它们(的原因是您使用了KStream::selectKey(…(,它更改了键,但不进行重新分区您可能会在错误的分区中查找密钥。
看看以下场景:
- 消息1:
k1
、v1
(partition0
( - 消息2:
k2
、v2
(partition1
(
假设消息被放在不同的分区中(因为密钥(选择密钥后:k1 -> k
、k2 -> k
- 消息1:
k
、v1
- 消息2:
k
、v2
操作selectKey
是无状态的,因此不会将消息发送到下游(主题(,也不会发生重新分区。对于第一条消息:将关键字-k的值放入存储区(partition0(当第二条消息到达时:对于密钥-k,没有消息,因为它是不同的分区(分区1(