我想在我的Apache Flink项目中使用ProcessWindowFunction
。但是我在使用进程函数时遇到一些错误,请参见下面的代码片段
错误是:
类型为 WindowedStream,Tuple,TimeWindow>> 中的方法 process(ProcessWindowFunction,R,Tuple,TimeWindow不适用于参数 (JDBCExample.MyProcessWindows(
我的程序:
DataStream<Tuple2<String, JSONObject>> inputStream;
inputStream = env.addSource(new JsonArraySource());
inputStream.keyBy(0)
.window(TumblingEventTimeWindows.of(Time.minutes(10)))
.process(new MyProcessWindows());
我的ProcessWindowFunction
:
private class MyProcessWindows
extends ProcessWindowFunction<Tuple2<String, JSONObject>, Tuple2<String, String>, String, Window>
{
public void process(
String key,
Context context,
Iterable<Tuple2<String, JSONObject>> input,
Collector<Tuple2<String, String>> out) throws Exception
{
...
}
}
问题可能是ProcessWindowFunction
的泛型类型。
您正在按位置 ( keyBy(0)
( 引用密钥。因此,编译器无法推断其类型(String
(,您需要将ProcessWindowFunction
更改为:
private class MyProcessWindows
extends ProcessWindowFunction<Tuple2<String, JSONObject>, Tuple2<String, String>, Tuple, Window>
通过将 String
替换为 Tuple
您现在拥有了一个通用的键占位符,当您需要在 processElement()
方法中访问键时,可以强制转换为Tuple1<String>
:
public void process(
Tuple key,
Context context,
Iterable<Tuple2<String, JSONObject>> input,
Collector<Tuple2<String, String>> out) throws Exception {
String sKey = (String)((Tuple1)key).f0;
...
}
如果定义 KeySelector<IN, KEY>
函数来提取密钥,则可以避免强制转换并使用正确的类型,因为编译器已知KeySelector
的返回类型KEY
。
Fabian :)使用Tuple
应该有效,但确实涉及ProcessWindowFunction
中的一些丑陋类型转换。使用KeySelector
很容易,并且会产生更干净的代码。例如
.keyBy(new KeySelector<Tuple2<String,JsonObject>, String>() {
@Override
public String getKey(Tuple2<String, JsonObject> in) throws Exception {
return in.f0;
}
})
然后,通过上述内容,您可以定义如下ProcessWindowFunction
:
public class MyProcessWindows extends ProcessWindowFunction<Tuple2<String, JsonObject>, Tuple2<String, String>, String, TimeWindow> {