你能用泛型类型实现 Flink 的 AggregateFunction 吗?



我的目标是为 Flink 1.10 中的流处理模块提供一个接口。管道包含 AggregateFunction 以及其他运算符。所有运算符都有泛型类型,但问题出在 AggregateFunction 中,它无法确定输出类型。

注意:实际管道有一个滑动事件时间窗口分配器和一个与 AggregateFunction 一起传递的 WindowFunction,但使用以下代码可以更轻松地重现错误。

这是一个重现错误的简单测试用例:

@Test
public void aggregateFunction_genericType() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStream<Tuple2<String,Integer>> source = env.fromElements(Tuple2.of("0",1), Tuple2.of("0",2), Tuple2.of("0",3));
ConfigAPI cfg = new ConfigAPI();
source
.keyBy(k -> k.f0)
.countWindow(5, 1)
.aggregate(new GenericAggregateFunc<>(cfg))
.print();

env.execute();
}

如您所见,配置类作为参数传递给自定义聚合函数。这是用户将实现的。

public static class ConfigAPI implements BaseConfigAPI<Tuple2<String, Integer>, Tuple2<String,Integer>> {
@Override
public Tuple2<String, Integer> createAcc() {
return new Tuple2<>("0", 0);
}
@Override
public Tuple2<String, Integer> addAccumulators(Tuple2<String, Integer> in, Tuple2<String, Integer> acc) {
acc.f1 += in.f1;
return acc;
}
}

提供的接口是:

public interface BaseConfigAPI<In, Acc> {
Acc createAcc();
Acc addAccumulators(In in, Acc acc);
// other methods to override
}

GenericAggregateFunction:

public static class GenericAggregateFunc<In, Acc> implements AggregateFunction<In, Acc, Acc> {
private BaseConfigAPI<In, Acc> cfg;
GenericAggregateFunc(BaseConfigAPI<In, Acc> cfg) {
this.cfg = cfg;
}
@Override
public Acc createAccumulator() {
return cfg.createAcc();
}
@Override
public Acc add(In in, Acc acc) {
return cfg.addAccumulators(in, acc);
}
@Override
public Acc getResult(Acc acc) {
return acc;
}
@Override
public Acc merge(Acc acc, Acc acc1) {
return null;
}
}

输出日志:

org.apache.flink.api.common.functions.InvalidTypesException: 
Type of TypeVariable 'Acc' in 'class misc.SlidingWindow$GenericAggregateFunc' could not be determined. This is most likely a type erasure problem. 
The type extraction currently supports types with generic variables only in cases where all variables in the return type can be deduced from the input type(s). 
Otherwise the type has to be specified explicitly using type information.

解决方案 1(不起作用): 起初我认为这是"无法确定返回类型"的常见情况,所以我尝试添加

.returns(Types.TUPLE(Types.STRING, Types.INT)).aggregate(...)但没有成功。

解决方案 2(工作): 我创建了一个带有泛型类型的 Wrapper 类,名为Accumulator<Acc>然后将其作为 Type 传递给AggregateFunction<In, Accumulator<Acc>, Accumulator<Acc>>,似乎正在工作。

不过,这看起来不是很优雅,并且与界面的其余部分不太一致。这个问题还有其他解决方案吗?

编辑:感谢您的时间和见解@deduper我想我找到了解决方案。

解决方案 3(工作):我创建了一个新界面,它通过以下方式扩展了我的BaseConfigAPIAggregateFunction

public interface MergedConfigAPI<In, Acc, Out> extends BaseConfigAPI, AggregateFunction<In, Acc, Out> {}
public interface BaseConfigAPI extends Serializable {
//These will be implemented directly from AggregateFunction interface
//Acc createAcc();
//Acc addAccumulators(In in, Acc acc);

//other methods to override
}

现在,用户只需实现MergedConfigAPI<In, Acc, Out>并将其作为参数传递给.aggregate(...)函数。

更新:我针对框架测试了@deduper的第三个解决方案,但它也不起作用。似乎异常是由Acc而不是Out类型引发的。仔细查看.aggregate运算符的内部结构,我意识到有一个重载的aggregate方法需要另外 2 个参数。一个TypeInformation<ACC> accumulatorType和一个TypeInformation<R> returnType.

这就是最简单的解决方案是如何出现的,无需任何代码重构。

解决方案 4(工作)

@Test
public void aggregateFunction_genericType() throws Exception {
...
.aggregate(
new GenericAggregateFunc<>(cfg), 
Types.TUPLE(Types.STRING, Types.INT),
Types.TUPLE(Types.STRING, Types.INT))
...
}

注意:从 Flink 1.10.1 开始,aggregate方法都用 @PublicEvolving 注释。

">你能用泛型类型实现 Flink 的 AggregateFunction 吗?">

是的。您可以。就像你自己已经做的那样。您的错误是由于您如何使用它(">使用站点泛型")而不是您如何实现它。

">...这个问题还有其他解决方案吗?...">

我按简单程度的升序提出以下三个候选解决方案......

...
source
.keyBy(k -> k.f0)
.countWindow(5, 1)
.aggregate(new GenericAggregateFunc< Tuple2<String, Integer>, Tuple2<String, Integer> >(cfg)) /* filling in the diamond will aid type inference */
.print();
...

以上是最简单的,因为你不必重构你的原始GenericAgregateFunc;只需用你想要实例化泛型类的特定类型参数填充菱形。

还有另一个稍微不那么简单的解决方案...

public static class GenericAggregateFunc implements AggregateFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple2<String, Integer>> {
private BaseConfigAPI<Tuple2<String, Integer>, Tuple2<String, Integer>> cfg;
GenericAggregateFunc(BaseConfigAPI<Tuple2<String, Integer>, Tuple2<String, Integer>> cfg) {
this.cfg = cfg;
}
@Override
public Tuple2<String, Integer> createAccumulator() {
return cfg.createAcc();
}
@Override
public Tuple2<String, Integer> add(Tuple2<String, Integer> in, Tuple2<String, Integer> acc) {
return cfg.addAccumulators(in, acc);
}
@Override
public Tuple2<String, Integer> getResult(Tuple2<String, Integer> acc) {
return acc;
}
@Override
public Tuple2<String, Integer> merge(Tuple2<String, Integer> acc, Tuple2<String, Integer> acc1) {
return null;
}
}

虽然这涉及一个小的重构,但它比第一个提出的解决方案更简化了你的整个应用程序——在我看来

Flink 已经为你处理了">复杂"的泛型多态性。要插件到 Flink,您所要做的就是简单地使用您想要实例化的特定类型参数实例化其内置的泛型AggregateFunction<IN, ACC, OUT>。这些类型参数在您的情况下属于类型Tuple2<String, Integer>

因此,您仍然在第二种解决方案中">使用泛型",但您正在以更简单的方式执行此操作。

另一个更接近原始实现的选项,但进行了一些小的重构......

public static class GenericAggregateFunc<In, Acc, Out> implements AggregateFunction<In, Acc, Out> {

...
@Override
public Out getResult(Acc acc) {
return ...;
}
...
}

此外,要强制用户的配置实现与您的函数兼容的接口的前提条件......

public interface BaseConfigAPI< In, Acc, Out >{ ... }

在我的实验中,我已经确认将Out类型参数也添加到BaseConfigAPI中,使其兼容。

我确实想到了一个更复杂的替代解决方案。但是由于越简单几乎总是越好,我将把更复杂的解决方案留给其他人提出。

相关内容

  • 没有找到相关文章

最新更新