Flink SQL 是否支持 Java Map 类型?



我正在尝试使用 Flink 的 SQL API 从映射中访问密钥。它失败,线程"main"中出现错误异常 org.apache.flink.table.api.TableException: 不支持类型:任何 请告知我如何解决它。 这是我的活动类

public class EventHolder {
private Map<String,String> event;
public Map<String, String> getEvent() {
return event;
}
public void setEvent(Map<String, String> event) {
this.event = event;
}
}

这是提交 flink 作业的主类

public class MapTableSource {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<EventHolder> mapEventStream = env.fromCollection(getMaps());
// register a table and use SQL
StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
tableEnv.registerDataStream("mapEvent", mapEventStream); 
//tableEnv.registerFunction("orderSizeType", new OrderSizeType());
Table alerts = tableEnv.sql(
"select event['key'] from mapEvent ");
DataStream<String> alertStream = tableEnv.toAppendStream(alerts, String.class);
alertStream.filter(new FilterFunction<String>() {
private static final long serialVersionUID = -2438621539037257735L;
@Override
public boolean filter(String value) throws Exception {
System.out.println("Key value is:"+value);
return value!=null;
}
});
env.execute("map-tablsource-job");
}
private static List<EventHolder> getMaps(){
List<EventHolder> list = new ArrayList<>();
for(int i=0;i<5;i++){
EventHolder holder = new EventHolder();
Map<String,String> map = new HashMap<>();
map.put("key", "value");
holder.setEvent(map);
list.add(holder);
}
return list;
}
}

当我运行它时,我收到异常

Exception in thread "main" org.apache.flink.table.api.TableException: Type is not supported: ANY
at org.apache.flink.table.api.TableException$.apply(exceptions.scala:53)
at org.apache.flink.table.calcite.FlinkTypeFactory$.toTypeInfo(FlinkTypeFactory.scala:341)
at org.apache.flink.table.plan.logical.LogicalRelNode$$anonfun$12.apply(operators.scala:530)
at org.apache.flink.table.plan.logical.LogicalRelNode$$anonfun$12.apply(operators.scala:529)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245)
at scala.collection.Iterator$class.foreach(Iterator.scala:742)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1194)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:245)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at org.apache.flink.table.plan.logical.LogicalRelNode.<init>(operators.scala:529)
at org.apache.flink.table.api.TableEnvironment.sql(TableEnvironment.scala:503)
at com.c.p.flink.MapTableSource.main(MapTableSource.java:25)

我正在使用 flink 1.3.1

我认为问题出在fromCollection.由于 Java 的限制(即类型擦除(,Flink 无法提取所需的类型信息。因此,您的映射被视为具有 SQL ANY 类型的黑匣子。您可以使用tableEnv.scan("mapEvent").printSchema()验证表的类型。您可以使用Types.MAP(Types.STRING, Types.STRING)fromCollection中指定类型信息。

我用以下内容解决了类似的问题:

//Should probably make MapVal more generic, but works for this example
public class MapVal extends ScalarFunction {
public String eval(Map<String, String> obj, String key) {
return obj.get(key);
}
}
public class Car {
private String make;
private String model;
private int year;
private Map<String, String> attributes;
//getters/setters...
}
//After registering Stream and TableEnv etc
tableEnv.registerFunction("mapval", new MapVal());
Table cars = tableEnv
.scan("Cars")
.select("make, model, year, attributes.mapval('name')");

相关内容

  • 没有找到相关文章

最新更新