我正在从Topic中消费数据并将其写入表:
final KStream<String, AuftragGemeindeschluessel> myStream = kStreamBuilder.stream(
sabKafkaProperties.getAuftragGemeindeSchluesselTopicName(),
Consumed.with(
org.apache.kafka.common.serialization.Serdes.String(),
Serdes.getMyObjectSerde())
.withName("AUFTRAG_GEMEINDESCHLUESSEL"));
myStream.toTable(Named.as("MY_STATE_STORE"), getMaterializedViewForStateStore());
我总是有一个相同的键,这意味着新值总是覆盖我的KTable中的旧值,但我必须首先比较旧值和新值。
我怎么能这么做?
我对这个问题的解决方案是这样的:
public class MyValueTransformer implements ValueTransformer{
private KeyValueStore<String, MyObject> stateStore;
@Override
public void init(ProcessorContext context) {
stateStore = context.getStateStore(STATE_STORE_NAME);
}
@Override
public List<OtherObject> transform(OtherObj currentObj) {
//fetch last saved value from store
final OtherObj otherObj = stateStore.get("myKey");
/*
* do some logik for computing current state-store value, before it will be overwritten with "toTable"
*/
//save
stateStore.put("myKey", currentObj);
return new ArrayList();
}
}