使用 Kafka 翻转窗口进行查询时返回空数据



我正在尝试查询状态存储以在 5 分钟的窗口中获取数据。为此,我正在使用tumbling window.添加了 REST 来查询数据。 我已经stream A它消耗来自topic1的数据并执行一些转换并将键值输出到topic2。 现在在stream B,我正在对topic2数据进行翻转窗口操作。当我运行代码并使用 REST 查询时,我在浏览器上看到空数据。我可以看到状态存储中的数据在流动。

我观察到的是,我不是topic2stream A获取数据,而是使用生产者类将数据注入topic2并能够从浏览器查询数据。但是当topic2stream A获取数据时,我得到的是空数据。

这是我stream A代码:

public static void main(String[] args) {    
final StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> source = builder.stream("topic1");
KStream<String, String> output = source
.map((k,v)->
{                                                                       
Map<String, Object> Fields = new LinkedHashMap<>();
Fields.put("FNAME","ABC");
Fields.put("LNAME","XYZ");
Map<String, Object> nFields = new LinkedHashMap<>();
nFields.put("ADDRESS1","HY");
nFields.put("ADDRESS2","BA");               
nFields.put("addF",Fields);
Map<String, Object> eve = new LinkedHashMap<>();                            
eve.put("nFields", nFields);
Map<String, Object> fevent = new LinkedHashMap<>();
fevent.put("eve", eve);             
LinkedHashMap<String, Object> newMap = new LinkedHashMap<>(fevent);                                 
return new KeyValue<>("JAY1234",newMap.toString());  
});
output.to("topic2");        

}

这是我的stream B代码(发生翻转窗口操作):

public static void main(String[] args) {
final StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> eventStream = builder.stream("topic2");
eventStream.groupByKey()
.windowedBy(TimeWindows.of(300000))         
.reduce((v1, v2) -> v1 + ";" + v2, Materialized.as("TumblingWindowPoc"));
final Topology topology = builder.build();      
KafkaStreams streams = new KafkaStreams(topology, props);   
streams.start();      
}

休息代码 :

@GET()
@Path("/{storeName}/{key}")
@Produces(MediaType.APPLICATION_JSON)
public List<KeyValue<String, String>> windowedByKey(@PathParam("storeName") final String storeName,
@PathParam("key") final String key) {
final ReadOnlyWindowStore<String, String> store = streams.store(storeName,
QueryableStoreTypes.<String, String>windowStore());
if (store == null) {
throw new NotFoundException();      }
long timeTo = System.currentTimeMillis(); 
long timeFrom = timeTo - 30000;         
final WindowStoreIterator<String> results = store.fetch(key, timeFrom, timeTo);
final List<KeyValue<String,String>> windowResults = new ArrayList<>();
while (results.hasNext()) {
final KeyValue<Long, String> next = results.next();     
windowResults.add(new KeyValue<String,String>(key + "@" + next.key, next.value));
}
return windowResults;
}

这就是我的键值数据的样子:

JAY1234 {eve = {nFields = {ADDRESS1 = HY,ADDRESS2 = BA,Fields = {FNAME = ABC,LNAME = XYZ,}}}}

我应该能够在使用 REST 查询时获取数据。任何帮助将不胜感激。 谢谢!

获取窗口时间应该在窗口开始之前。因此,如果您想要过去 30 秒的数据,您可以减去用于获取的窗口持续时间,例如 timeTo - 30000 - 300000,然后从整个窗口数据中过滤掉所需的事件

事件

相关内容

  • 没有找到相关文章

最新更新