Flink 加入以丰富流

  • 本文关键字:Flink java apache-flink
  • 更新时间 :
  • 英文 :


我对Apache Flink很陌生。我正在使用v1.9.0.我想加入多个流示例。运行以下示例时,我遇到以下异常。

例外:

15:18:51,839 INFO  org.apache.flink.runtime.taskmanager.Task                     - Window(SlidingEventTimeWindows(2, 1), EventTimeTrigger, CoGroupWindowFunction) -> Sink: Print to Std. Out (3/4) (ebc7985691707417b57a391ac83104f9) switched from RUNNING to FAILED.
java.lang.RuntimeException: Record has Long.MIN_VALUE timestamp (= no timestamp marker). Is the time characteristic set to 'ProcessingTime', or did you forget to call 'DataStream.assignTimestampsAndWatermarks(...)'?
at org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows.assignWindows(SlidingEventTimeWindows.java:78)
at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:295)
at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)

我的主要方法:

public class ProcessingTimeJoinExercise {
@SuppressWarnings("serial")
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Simulated trade stream
DataStream<Trade> tradeStream = FinSources.tradeSource(env);
// Simulated customer stream
DataStream<Customer> customerStream = FinSources.customerSource(env);
// Stream of enriched trades
DataStream<EnrichedTrade> joinedStream = tradeStream
.join(customerStream).where(new KeySelector<Trade, Long>() {
@Override
public Long getKey(Trade trade) throws Exception {
return trade.customerId;
}
}).equalTo(new KeySelector<Customer, Long>() {
@Override
public Long getKey(Customer cust) throws Exception {
// TODO Auto-generated method stub
return cust.customerId;
}
}).window(SlidingEventTimeWindows.of(Time.milliseconds(2) /* size */, Time.milliseconds(1) /* slide */))
.apply(new JoinFunction<Trade, Customer, EnrichedTrade>() {
@Override
public EnrichedTrade join(Trade trade, Customer customer) {
return new EnrichedTrade(trade, customer);
}
});
joinedStream.print();
env.execute("processing-time join");

知道我做错了什么吗?

如果添加

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

在开头附近,在初始化env后,它将正常运行。

相关内容

  • 没有找到相关文章

最新更新