我有一个订阅主题的FlinkKafkaConsumer011
。我希望处理(apply
(每个kafka消费者消息,因此自定义FooTrigger
返回每个元素的TriggerResult.FIRE
。
按照代码工作,我只是对timeWindowAll(Time.minutes(1))
感到困惑。看来我做错了什么。
// set up streaming execution environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
// create a Kafka consumer
FlinkKafkaConsumer011<Foo> consumer = new FlinkKafkaConsumer011<>(
"topic",
new Foo.FooSchema(),
props); // Properties object
// create Kafka consumer data source
DataStream<FooTuple> trades = env.addSource(consumer)
.timeWindowAll(Time.minutes(1))
.trigger(new FooTrigger())
.evictor(new FooEvictor())
.apply(new CreateFoos());
如果你的目标是将一个函数应用于流中的每个事件,那么在 Flink 中,ProcessFunction
将是一种更自然的方式。或者在更简单的情况下,您可以使用map或flatmap,或其丰富的变体,即RichMapFunction或RichFlatMapTFunction - 这完全取决于您要做什么。
使用映射或平面映射,您可以执行无状态的一对一或一对多转换,其丰富的变体可以使用键控状态,ProcessFunction 可以使用状态和计时器(前提是流已被键控(。
timeWindowAll 适用于流未按键分区的情况,并且您希望在由持续时间定义的批处理中执行非并行处理(对于键控并行窗口,请改用 timeWindow(。如果您只想在数据到达时处理数据,那么窗口化会增加不必要的复杂性。