以下代码示例在 1.3 中不起作用
public class TumblingWindow {
public static void main(String[] args) throws Exception {
List<Content> data = new ArrayList<Content>();
data.add(new Content(1L, "Hi"));
data.add(new Content(2L, "Hallo"));
data.add(new Content(3L, "Hello"));
data.add(new Content(4L, "Hello"));
data.add(new Content(7L, "Hello"));
data.add(new Content(8L, "Hello world"));
data.add(new Content(16L, "Hello world"));
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
final StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
DataStream<Content> stream = env.fromCollection(data);
DataStream<Content> stream2 = stream.assignTimestampsAndWatermarks(
new BoundedOutOfOrdernessTimestampExtractor<Content>(Time.milliseconds(1)) {
/**
*
*/
private static final long serialVersionUID = 410512296011057717L;
@Override
public long extractTimestamp(Content element) {
return element.getRecordTime();
}
});
Table table = tableEnv.fromDataStream(stream2,
"urlKey,httpGetMessageCount,httpPostMessageCount" + ",uplink,downlink,statusCode,statusCodeCount,rowtime.rowtime");
table.window(Tumble.over("1.hours").on("rowtime").as("w")).groupBy("w, urlKey")
.select("w.start,urlKey,uplink.sum,downlink.sum,httpGetMessageCount.sum,httpPostMessageCount.sum ");
env.execute();
}
public static class Content implements Serializable {
private String urlKey;
private long recordTime;
// private String recordTimeStr;
private long httpGetMessageCount;
private long httpPostMessageCount;
private long uplink;
private long downlink;
private long statusCode;
private long statusCodeCount;
public Content() {
super();
}
public Content(long recordTime, String urlKey) {
super();
this.recordTime = recordTime;
this.urlKey = urlKey;
}
public String getUrlKey() {
return urlKey;
}
public void setUrlKey(String urlKey) {
this.urlKey = urlKey;
}
public long getRecordTime() {
return recordTime;
}
public void setRecordTime(long recordTime) {
this.recordTime = recordTime;
}
public long getHttpGetMessageCount() {
return httpGetMessageCount;
}
public void setHttpGetMessageCount(long httpGetMessageCount) {
this.httpGetMessageCount = httpGetMessageCount;
}
public long getHttpPostMessageCount() {
return httpPostMessageCount;
}
public void setHttpPostMessageCount(long httpPostMessageCount) {
this.httpPostMessageCount = httpPostMessageCount;
}
public long getUplink() {
return uplink;
}
public void setUplink(long uplink) {
this.uplink = uplink;
}
public long getDownlink() {
return downlink;
}
public void setDownlink(long downlink) {
this.downlink = downlink;
}
public long getStatusCode() {
return statusCode;
}
public void setStatusCode(long statusCode) {
this.statusCode = statusCode;
}
public long getStatusCodeCount() {
return statusCodeCount;
}
public void setStatusCodeCount(long statusCodeCount) {
this.statusCodeCount = statusCodeCount;
}
}
private class TimestampWithEqualWatermark implements AssignerWithPunctuatedWatermarks<Object[]> {
/**
*
*/
private static final long serialVersionUID = 1L;
@Override
public long extractTimestamp(Object[] element, long previousElementTimestamp) {
// TODO Auto-generated method stub
return (long) element[0];
}
@Override
public Watermark checkAndGetNextWatermark(Object[] lastElement, long extractedTimestamp) {
return new Watermark(extractedTimestamp);
}
}
}
将引发以下异常
线程"main"中的异常 org.apache.flink.table.api.TableException:rowtime 属性只能将字段替换为有效的时间类型,例如时间戳或长。 at org.apache.flink.table.api.StreamTableEnvironment$$anonfun$validateAndExtractTimeAttributes$1.apply(StreamTableEnvironment.scala:450) at org.apache.flink.table.api.StreamTableEnvironment$$anonfun$validateAndExtractTimeAttributes$1.apply(StreamTableEnvironment.scala:440) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186) at org.apache.flink.table.api.StreamTableEnvironment.validateAndExtractTimeAttributes(StreamTableEnvironment.scala:440) at org.apache.flink.table.api.StreamTableEnvironment.registerDataStreamInternal(StreamTableEnvironment.scala:401) at org.apache.flink.table.api.java.StreamTableEnvironment.fromDataStream(StreamTableEnvironment.scala:88) at com.Taiwanmobile.cep.noc.TumblingWindow.main(TumblingWindow.java:53)但是
如果我从数据流中删除状态代码计数,此示例将成功运行,没有异常。
Table table = tableEnv.fromDataStream(stream2,
"urlKey,httpGetMessageCount,httpPostMessageCount" + ",uplink,downlink,statusCode,statusCodeCount,rowtime.rowtime");
table.window(Tumble.over("1.hours").on("rowtime").as("w")).groupBy("w, urlKey")
.select("w.start,urlKey,uplink.sum,downlink.sum,httpGetMessageCount.sum,httpPostMessageCount.sum ");
有什么建议吗?
这是作为 FLINK-6881 提交的错误。作为解决方法,您可以定义自己的实现DefinedRowtimeAttribute
的StreamTableSource
(另请参阅此文档草稿)。表源还可以很好地隐藏底层数据流 API,从而使表程序更加紧凑。