Flink:Flink 是否支持抽象算子,它可以处理具有公共字段的不同数据流?



假设我们有多个数据流,它们共享一些共同的特征。

例如,我们有一个教师流和一个学生流,它们都有一个年龄字段。如果我想从实时流中找出年龄最大的学生或老师,我可以实现如下运算符。

public MaxiumAgeFunc extends RichMapFunction<Student,Integer> {
int maxAge;
@Override
public void flatMap(Student s, Collector<Integer> collector) throws Exception {
if(s.age > maxAge){
maxAge = s.age;
}
collector.collect(maxAge);
}
}

要找出最年长的老师,我们需要实现一个类似的运算符,

如下所示
public MaxiumAgeFunc extends RichMapFunction<Teacher,Integer> {
int maxAge;
@Override
public void flatMap(Teacher t, Collector<Integer> collector) throws Exception {
if(t.age > maxAge){
maxAge = t.age;
}
collector.collect(maxAge);
}
}

但实际上这两个运算符具有共同的过程逻辑,所以我的想法是定义一个父类,例如People

public class People{
public Integer age;
}

然后,可以将学生教师定义为他们的子类,并保留自己的字段。

public class Student extends People {
public Integer grade;  // student grade
...
}
public class Student extends People {
public Integer subject;  // the subject that teacher teaches
...
}

在这种情况下,我可以定义一个运算符,如下所示。

public MaxiumAgeFunc extends RichMapFunction<People,Integer> {
int maxAge;
@Override
public void flatMap(People p, Collector<Integer> collector) throws Exception {
if(t.age > maxAge){
maxAge = p.age;
}
collector.collect(maxAge);
}
}

但是当我尝试使用此运算符来实现 Flink 执行拓扑时,由于数据类型不匹配,它不起作用。

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Student> studentStream = env.addSource(...);
DataStream<Teacher> teacherStream = env.addSource(...);
studentStream.map(new MaxiumAgeFunc()).print();
teacherStream.map(new MaxiumAgeFunc()).print();

这是我的问题,是否可以为具有公共字段的输入流制作抽象运算符?

这与其说是 Flink 问题,不如说是一个 Java:

您要做的是像这样使MaxiumAgeFunc参数化

public MaxiumAgeFunc<T extends People> extends RichMapFunction<T, Integer> {
int maxAge;
@Override
public void flatMap(T p, Collector<Integer> collector) throws Exception {
if(t.age > maxAge){
maxAge = p.age;
}
collector.collect(maxAge);
}
}

然后像这样使用它

studentStream.map(new MaxiumAgeFunc<>()).print();
teacherStream.map(new MaxiumAgeFunc<>()).print();

编辑:

顺便说一句,您的函数无法使用检查点(因此从检查点恢复时会产生错误的结果(,我宁愿在全局窗口中使用聚合函数。

students
.windowAll(GlobalWindows.create())
.aggregate(new AggregateFunction<People, Integer, Integer>() {
@Override
public Integer createAccumulator() {
return -1;
}
@Override
public Integer add(People value, Integer accumulator) {
return Math.max(value.age, accumulator);
}
@Override
public Integer getResult(Integer accumulator) {
return accumulator;
}
@Override
public Integer merge(Integer a, Integer b) {
return Math.max(a, b);
}
});

相关内容

  • 没有找到相关文章

最新更新