Apache Flink: ProcessWindowFunction is not applicable



我想在我的Apache Flink项目中使用ProcessWindowFunction。但是我在使用进程函数时遇到一些错误,请参见下面的代码片段

错误是:

类型为 WindowedStream,Tuple,TimeWindow>> 中的方法 process(ProcessWindowFunction,R,Tuple,TimeWindow不适用于参数 (JDBCExample.MyProcessWindows(

我的程序:

DataStream<Tuple2<String, JSONObject>> inputStream;
inputStream = env.addSource(new JsonArraySource());
inputStream.keyBy(0)
  .window(TumblingEventTimeWindows.of(Time.minutes(10)))
  .process(new MyProcessWindows());

我的ProcessWindowFunction

private class MyProcessWindows 
  extends ProcessWindowFunction<Tuple2<String, JSONObject>, Tuple2<String, String>, String, Window>
{
  public void process(
      String key, 
      Context context, 
      Iterable<Tuple2<String, JSONObject>> input, 
      Collector<Tuple2<String, String>> out) throws Exception 
  {
    ...
  }
}

问题可能是ProcessWindowFunction的泛型类型。

您正在按位置 ( keyBy(0) ( 引用密钥。因此,编译器无法推断其类型(String(,您需要将ProcessWindowFunction更改为:

private class MyProcessWindows 
    extends ProcessWindowFunction<Tuple2<String, JSONObject>, Tuple2<String, String>, Tuple, Window>

通过将 String 替换为 Tuple您现在拥有了一个通用的键占位符,当您需要在 processElement() 方法中访问键时,可以强制转换为Tuple1<String>

public void process(
    Tuple key, 
    Context context, 
    Iterable<Tuple2<String, JSONObject>> input, 
    Collector<Tuple2<String, String>> out) throws Exception {
  String sKey = (String)((Tuple1)key).f0;
  ...
}

如果定义 KeySelector<IN, KEY> 函数来提取密钥,则可以避免强制转换并使用正确的类型,因为编译器已知KeySelector的返回类型KEY

Fabian :)使用Tuple应该有效,但确实涉及ProcessWindowFunction中的一些丑陋类型转换。使用KeySelector很容易,并且会产生更干净的代码。例如

.keyBy(new KeySelector<Tuple2<String,JsonObject>, String>() {
    @Override
    public String getKey(Tuple2<String, JsonObject> in) throws Exception {
        return in.f0;
    }
})

然后,通过上述内容,您可以定义如下ProcessWindowFunction

public class MyProcessWindows extends ProcessWindowFunction<Tuple2<String, JsonObject>, Tuple2<String, String>, String, TimeWindow> {

相关内容

  • 没有找到相关文章

最新更新