Flink keyedStream计数状态



我已经将flink配置为从主题中读取。这些是一些来自设备的警报。

genericEvent和DeviceState是简单的POJO

class GenericEvent {
String deviceName, region, alert;
Long timestamp;
}
class DeviceState{
String deviceName, status;
Long timestamp;
}

EventEvaluator是一个确定设备状态(向上或向下)的过程函数

我已经走了这么远

DataStream<GenericEvent> genericStream = env.addSource(kafkaSource);
SingleOutputStreamOperator<DeviceState> deviceStateStream = genericStream.keyBy(GenericEvent::getDevice).flatMap(new EventEvaluator()); 
SingleOutputStreamOperator<DeviceState> downNodes = nodesStatuses.filter(nodeDownFilter);

private static FilterFunction<DeviceState> nodeDownFilter = new FilterFunction<DeviceState>() {
@Override
public boolean filter(DeviceState deviceState) {
if (deviceState.getState().equals("DOWN"))
return true;
return false;
}
};

现在我需要计算处于DOWN状态的设备的数量。怎么能做到之后,我需要对每个区域中处于关闭状态的设备进行计数。(区域可以是南/北/东/西)任何建议

我们可以使用FlinkKafkaConsumer.assignTimestampsAndWatermark

请参阅Kafka消费者和时间戳提取/水印发射部分

https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/datastream/kafka/

FlinkKafkaConsumer<GenericEvent> backupConsumer =
new FlinkKafkaConsumer<>(CONSUMER_TOPIC, new DeserializationSchema(), properties);
backupConsumer.setStartFromLatest();
backupConsumer.assignTimestampsAndWatermarks(
WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(5)));
DataStream<GenericEvent> backupDataStream = env.addSource(backupConsumer);
DataStream<DeviceState> result = backupDataStream.flatMap(new FlatMapFunction<GenericEvent, DeviceState>() {
@Override
public void flatMap(GenericEvent value, Collector<DeviceState> out) {
if (value != null) {
//Add you logic here 
out.collect(deviceState);
}
}
});
CassandraSink.addSink(result)
.setHost(CASSANDRA_HOST,CASSANDRA_PORT)
.setMapperOptions(() -> new Mapper.Option[]{Mapper.Option.saveNullFields(true)})
.build();

相关内容

  • 没有找到相关文章

最新更新