我正在使用配置为使用flink-table
jar 的本地 Flink 1.6 集群 (意味着我的程序的 jar 不包括flink-table
(。 使用以下代码
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.java.BatchTableEnvironment;
import org.apache.flink.table.functions.AggregateFunction;
import org.apache.flink.types.Row;
import java.util.ArrayList;
import java.util.List;
public class JMain {
public static void main(String[] args) throws Exception {
ExecutionEnvironment execEnv = ExecutionEnvironment.getExecutionEnvironment();
BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(execEnv);
tableEnv.registerFunction("enlist", new Enlister());
DataSource<Tuple2<String, String>> source = execEnv.fromElements(
new Tuple2<>("a", "1"),
new Tuple2<>("a", "2"),
new Tuple2<>("b", "3")
);
Table table = tableEnv.fromDataSet(source, "a, b")
.groupBy("a")
.select("enlist(a, b)");
tableEnv.toDataSet(table, Row.class)
.print();
}
public static class Enlister
extends AggregateFunction<List<String>, ArrayList<String>>
implements ResultTypeQueryable<List<String>>
{
@Override
public ArrayList<String> createAccumulator() {
return new ArrayList<>();
}
@Override
public List<String> getValue(ArrayList<String> acc) {
return acc;
}
@SuppressWarnings("unused")
public void accumulate(ArrayList<String> acc, String a, String b) {
acc.add(a + ":" + b);
}
@SuppressWarnings("unused")
public void merge(ArrayList<String> acc, Iterable<ArrayList<String>> it) {
for (ArrayList<String> otherAcc : it) {
acc.addAll(otherAcc);
}
}
@SuppressWarnings("unused")
public void resetAccumulator(ArrayList<String> acc) {
acc.clear();
}
@Override
public TypeInformation<List<String>> getProducedType() {
return TypeInformation.of(new TypeHint<List<String>>(){});
}
}
}
我得到了这个奇怪的异常:
org.apache.flink.table.api.ValidationException: Expression Enlister(List('a, 'b)) failed on input check: Given parameters do not match any signature.
Actual: (java.lang.String, java.lang.String)
Expected: (java.lang.String, java.lang.String)
但是,如果我不实现ResultTypeQueryable
, 我得到预期的输出:
Starting execution of program
[b:3]
[a:1, a:2]
Program execution finished
Job with JobID 20497bd3efe44fab0092a05a8eb7d9de has finished.
Job Runtime: 270 ms
Accumulator Results:
- 56e0e5a9466b84ae44431c9c4b7aad71 (java.util.ArrayList) [2 elements]
我的实际用例似乎需要ResultTypeQueryable
, 因为否则我得到这个异常:
The return type of function ... could not be determined automatically,
due to type erasure. You can give type information hints by using the
returns(...) method on the result of the transformation call,
or by letting your function implement the 'ResultTypeQueryable' interface
有什么方法可以解决这个问题吗?
在这种情况下,实现ResultTypeQueryable
是不正确的。例外情况具有误导性。而是覆盖getResultType()
和getAccumulatorType()
。这背后的原因是泛型在为序列化程序生成类型信息时通常会导致问题(由于 Java 的类型擦除(。
我试图在一个小程序中重现这个问题,但我不能, 它只发生在我的大项目中。 不幸的是,覆盖getResultType()
和getAccumulatorType()
也无济于事, 在这种情况下,我得到了这个例外:
java.lang.IndexOutOfBoundsException
at org.apache.flink.api.java.typeutils.TupleTypeInfoBase.getTypeAt(TupleTypeInfoBase.java:199)
at org.apache.flink.api.java.typeutils.RowTypeInfo.getTypeAt(RowTypeInfo.java:179)
at org.apache.flink.api.common.operators.Keys$ExpressionKeys.isSortKey(Keys.java:444)
at org.apache.flink.api.java.operators.SortPartitionOperator.ensureSortableKey(SortPartitionOperator.java:150)
at org.apache.flink.api.java.operators.SortPartitionOperator.<init>(SortPartitionOperator.java:75)
at org.apache.flink.api.java.DataSet.sortPartition(DataSet.java:1414)
即使没有覆盖,我实际上也得到了这个例外。 唯一对我有用的基本上是:
String[] fieldNames = new String[] {
"result"
};
TypeInformation<?>[] types = new TypeInformation[] {
TypeInformation.of(new TypeHint<List<String>>(){})
};
tableEnv.toDataSet(table, Types.ROW(fieldNames, types))...