KafkaStreams:访问更新前的KTable值



我正在从Topic中消费数据并将其写入表:

final KStream<String, AuftragGemeindeschluessel> myStream = kStreamBuilder.stream(
sabKafkaProperties.getAuftragGemeindeSchluesselTopicName(),
Consumed.with(
org.apache.kafka.common.serialization.Serdes.String(),
Serdes.getMyObjectSerde())
.withName("AUFTRAG_GEMEINDESCHLUESSEL"));
myStream.toTable(Named.as("MY_STATE_STORE"), getMaterializedViewForStateStore());

我总是有一个相同的键,这意味着新值总是覆盖我的KTable中的旧值,但我必须首先比较旧值和新值。

我怎么能这么做?

我对这个问题的解决方案是这样的:

public class MyValueTransformer implements ValueTransformer{
private KeyValueStore<String, MyObject> stateStore;

@Override
public void init(ProcessorContext context) {
stateStore = context.getStateStore(STATE_STORE_NAME);
}

@Override
public List<OtherObject> transform(OtherObj currentObj) {            
//fetch last saved value from store
final OtherObj otherObj = stateStore.get("myKey");
/*
* do some logik for computing current state-store value, before it will be overwritten with "toTable"
*/

//save 
stateStore.put("myKey", currentObj);
return new ArrayList();
}
}

相关内容

  • 没有找到相关文章

最新更新