我目前正在开发一款Kafka Streams应用程序,该应用程序使用我们数据库中的数据丰富传入事件。富集数据存储在使用Debezium不断更新的主题中。有些富集很容易实现,因为它们只是来自事件id的equi-join/left-join。但其他富集需要根据传入事件时间戳计算一个值:
假设我的传入事件主题具有以下模式:
user_id: Long
timestamp: Instant
然后我需要将此事件映射到以下输出:
user_id: Long
has_planned_meetings_in_the_future: Boolean
会议表存储在一个单独的主题中,具有以下记录结构:
user_id: Long
meeting_date: Instant
因此,对于每个事件,我都需要在会议主题中查找它们是否是该特定用户的记录,并且会议日期大于当前时间戳。
如何做到这一点?
一种可行的方法是在应用程序中使用会议主题,并将会议存储在状态存储中。
然后,您可以使用所描述的条件高效地查询状态存储。
这里有一个存储会议的简单示例:
public class MyMeetingsProcessor implements Processor<Object, Meeting> {
private String meetingsKeyStore = "meetings-key-store";
private KeyValueStore<Object, Meeting> meetings;
public void init(ProcessorContext context) {
meetings = (KeyValueStore<Object, List<String>>) context.getStateStore(meetingsKeyStore);
}
public void process(Object key, Meeting value) {
meetings.put(key, value);
}
}
要在使用事件时查询状态存储,可以执行以下操作:
public class MyEventsProcessor implements Processor<Object, Meeting> {
private String meetingsKeyStore = "meetings-key-store";
private KeyValueStore<Object, Meeting> meetings;
public void init(ProcessorContext context) {
meetings = (KeyValueStore<Object, List<String>>) context.getStateStore(meetingsKeyStore);
}
public void process(Object key, Event value) {
Meeting meeting = meetings.get(key);
if (meeting != null) {
// do something fun
}
}
}