我可以<T>在不使用内置的print()函数的情况下在Apache Flink中打印DataSteam的单个元素吗?



我正在尝试打印在 Flink 中检测到的警告值

为每个匹配的警告模式生成温度警告

DataStream<TemperatureEvent> warnings = tempPatternStream.select(
(Map<String, MonitoringEvent> pattern) -> {
TemperatureEvent first = (TemperatureEvent) pattern.get("first");

return new TemperatureEvent(first.getRackID(), first.getTemperature()) ;
}
);

// Print the warning and alert events to stdout

warnings.print();

我得到的输出如下(根据事件源函数的字符串(

Rack id = 99 and temprature = 76.0

有人可以告诉我,如果有办法在不使用打印的情况下打印数据流的值?例如,如果我只想打印温度,如何访问 DataStream 中的单个元素。

提前致谢

我已经找到了一种访问单个元素的方法,让我们假设我们有一个数据流

HeartRate<Integer,Integer>

它有 2 个属性

private Integer Patient_id ;
private  Integer HR;

使用自定义函数生成数据流

DataStream<HREvent> hrEventDataStream = envrionment
.addSource(new HRGenerator()).assignTimestampsAndWatermarks(new IngestionTimeExtractor<>());

假设您已经使用自定义函数生成了数据蒸汽,现在我们可以打印心率事件的各个元素的值,如下所示

hrEventDataStream.keyBy(new KeySelector<HREvent, Integer>() {
@Override
public Integer getKey(HREvent hrEvent) throws Exception {
return hrEvent.getPatient_id();
}
})
.window(TumblingEventTimeWindows.of(milliseconds(10)))
.apply(new WindowFunction<HREvent, Object, Integer, TimeWindow>() {
@Override
public void apply(Integer integer, TimeWindow timeWindow, Iterable<HREvent> iterable, Collector<Object> collector) throws Exception {
for(HREvent in : iterable){
System.out.println("Patient id  = " + in.getPatient_id() + " Heart Rate  = " + in.getHR());
}//for
}//apply
});

希望对您有所帮助!

最新更新