使用Flink DataStream在窗口持续时间内计算平均值



我正在使用flink datastream api,那里有架子可用&我想按机架ID计算温度组的"平均"。我的窗户持续时间为40秒&我的窗口每10秒滑动一次...以下是我的代码,我在其中计算 sum 每10秒钟的温度每10秒钟的温度,但是现在我想计算平均温度::

static Properties properties=new Properties();
    public static Properties getProperties()
    {
        properties.setProperty("bootstrap.servers", "54.164.200.104:9092");
        properties.setProperty("zookeeper.connect", "54.164.200.104:2181");
        //properties.setProperty("deserializer.class", "kafka.serializer.StringEncoder");
        //properties.setProperty("group.id", "akshay");
        properties.setProperty("auto.offset.reset", "earliest");
        return properties;
    }
 @SuppressWarnings("rawtypes")
public static void main(String[] args) throws Exception 
{
    StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment();
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
    Properties props=Program.getProperties();
    DataStream<TemperatureEvent> dstream=env.addSource(new FlinkKafkaConsumer09<TemperatureEvent>("TemperatureEvent", new TemperatureEventSchema(), props)).assignTimestampsAndWatermarks(new IngestionTimeExtractor<>());
    DataStream<TemperatureEvent> ds1=dstream.keyBy("rackId").timeWindow(Time.seconds(40), Time.seconds(10)).sum("temperature");
    env.execute("Temperature Consumer");
}

我如何在上述示例中加入平均温度??

据我所知,您需要自己编写平均功能。您可以在这里找到一个示例:

在您的情况下,您可能会替换 .sum("temperature");

有类似的东西 .apply(new Avg());并实现AVG类:

public class Avg implements WindowFunction<TemperatureEvent,  TemperatureEvent, Long, org.apache.flink.streaming.api.windowing.windows.Window> {
  @Override
  public void apply(Long key, Window window, Iterable<TemperatureEvent> values, Collector<TemperatureEvent> out) {
    long sum = 0L;
    int count = 0;
    for (TemperatureEvent value : values) {
        sum += value.getTemperature();
        count ++;
    }
    TemperatureEvent result = values.iterator().next();
    result.setTemperature(sum / count);
    out.collect(result);
  }
}

注意:如果有可能在空窗口上调用您的函数(例如,使用自定义触发器),则需要检查elements.head

最新更新