我想在Apache Flink中实现一个自定义过滤函数,但我不知道如何在不硬连接的情况下将过滤条件列表注入其中。
让我们假设我的函数看起来像这个
public class CustomFilter implements FilterFunction{
@Override
public boolean filter(Object o) throws Exception{
String[] values = {"First","Second","Last"}; <-- How can i pass this Array or Collection to my Filter function?
for(String s: values){
if(!o.toString().contains(s)) return false;
}
return true;
}
}
流媒体作业将如下所示:
public class StreamingJob{
...
env
.fromElements("Data","New Data","First")
.filter(new CustomFilter())
.print
.execute();
}
当我尝试将某种集合添加到类中的CustomFilter函数参数(如(时
public boolean filter(String s, Collection<String> searchValues){
...
}
我得到的消息是,该函数必须来自类型String,因为它是一个已实现的函数。
正如其他人所指出的,只需保存通过构造函数传入的目标值列表,并在filter()
方法中使用它们。
public class CustomFilter implements FilterFunction<Object> {
private String[] targetValues;
public CustomFilter(String[] targetValues) {
this.targetValues = targetValues;
}
}