Apache Flink Filter Function



我想在Apache Flink中实现一个自定义过滤函数,但我不知道如何在不硬连接的情况下将过滤条件列表注入其中。

让我们假设我的函数看起来像这个

public class CustomFilter implements FilterFunction{
@Override
public boolean filter(Object o) throws Exception{
String[] values = {"First","Second","Last"}; <-- How can i pass this Array or Collection to my Filter function?
for(String s: values){
if(!o.toString().contains(s)) return false;
}
return true;
}
}

流媒体作业将如下所示:

public class StreamingJob{
...
env 
.fromElements("Data","New Data","First")
.filter(new CustomFilter())
.print
.execute();
}

当我尝试将某种集合添加到类中的CustomFilter函数参数(如(时

public boolean filter(String s, Collection<String> searchValues){
...
}

我得到的消息是,该函数必须来自类型String,因为它是一个已实现的函数。

正如其他人所指出的,只需保存通过构造函数传入的目标值列表,并在filter()方法中使用它们。

public class CustomFilter implements FilterFunction<Object> {
private String[] targetValues;
public CustomFilter(String[] targetValues) {
this.targetValues = targetValues;
}

}

相关内容

  • 没有找到相关文章

最新更新