我使用的是使用Java的Flink Table API,我想将数据集转换为数据流。。。。以下是我的代码:
TableEnvironment tableEnvironment=new TableEnvironment();
Table tab1=table.where("related_value < 2014").select("related_value,ref_id");
DataSet<MyClass>ds2=tableEnvironment.toDataSet(tab1, MyClass.class);
DataStream<MyClass> d=tableEnvironment.toDataStream(tab1, MyClass.class);
但当我尝试执行这个程序时,它会抛出以下异常:
org.apache.flink.api.table.ExpressionException:JavaStreamingTranslator:Root(ArraySeq((related_value,Double),(ref_id,String))的根无效。您是否尝试将基于数据集的表转换为数据流,反之亦然我想知道如何使用Flink Table API将数据集转换为数据流??
我想知道的另一件事是,对于模式匹配,有Flink CEP库可用。但是使用Flink Table API进行模式匹配是否可行??
Flink的Table API不是为了将DataSet
转换为DataStream
而设计的,反之亦然。使用Table API不可能做到这一点,目前也没有其他方法可以使用Flink实现这一点。
统一DataStream
和DataSet
API(作为流的特殊情况,即作为有界流来处理批处理)是Flink的长期路线图。
使用TableEnvironment
时无法转换为DataStream API,必须创建一个StreamTableEnvironment
才能从表转换为DataStream,如下所示:
final EnvironmentSettings fsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
final StreamTableEnvironment fsTableEnv = StreamTableEnvironment.create(configuration, fsSettings);
DataStream<String> finalRes = fsTableEnv.toAppendStream(tableNameHere, MyClass.class);
希望能以某种方式帮助你
谨致问候!