我正在学习 Flink,我从使用 DataStream 的简单字数统计开始。为了增强处理能力,我过滤了输出,以仅显示找到 3 个或更多单词的结果。
DataStream<Tuple2<String, Integer>> dataStream = env
.socketTextStream("localhost", 9000)
.flatMap(new Splitter())
.keyBy(0)
.timeWindow(Time.seconds(5))
.apply(new MyWindowFunction())
.sum(1)
.filter(word -> word.f1 >= 3);
我想创建一个窗口函数来按找到的单词的值对输出进行排序。我尝试实现的窗口函数根本无法编译。我正在努力定义 WindowFunction 接口的应用方法和参数。
public static class MyWindowFunction implements WindowFunction<
Tuple2<String, Integer>, // input type
Tuple2<String, Integer>, // output type
Tuple2<String, Integer>, // key type
TimeWindow> {
void apply(Tuple2<String, Integer> key, TimeWindow window, Iterable<Tuple2<String, Integer>> input, Collector<Tuple2<String, Integer>> out) {
String word = ((Tuple2<String, Integer>)key).f0;
Integer count = ((Tuple2<String, Integer>)key).f1;
.........
out.collect(new Tuple2<>(word, count));
}
}
我正在更新这个答案以使用 Flink 1.12.0。为了对流的元素进行排序,我不得不在用ReduceFunction
对流进行计数后使用KeyedProcessFunction
。然后我必须将最后一个转换的并行性设置为1
,以便不更改我使用KeyedProcessFunction
排序的元素的顺序。我使用的序列是socketTextStream
->flatMap
->keyBy
->reduce
->keyBy
->process
->print().setParallelism(1)
。下面的例子是:
public class SocketWindowWordCountJava {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.socketTextStream("localhost", 9000)
.flatMap(new SplitterFlatMap())
.keyBy(new WordKeySelector())
.reduce(new SumReducer())
.keyBy(new WordKeySelector())
.process(new SortKeyedProcessFunction(3 * 1000))
.print().setParallelism(1);
String executionPlan = env.getExecutionPlan();
System.out.println("ExecutionPlan ........................ ");
System.out.println(executionPlan);
System.out.println("........................ ");
env.execute("Window WordCount sorted");
}
}
我用来对流进行排序的 UDF 是扩展KeyedProcessFunction
的SortKeyedProcessFunction
。我使用Event implements Comparable<Event>
ValueState<List<Event>> listState
将排序列表作为状态。在processElement
方法上,我注册了将事件添加到状态context.timerService().registerProcessingTimeTimer(timeoutTime);
的时间戳,并在onTimer
方法收集事件。我在这里也使用了 3 秒的时间窗口。
public class SortKeyedProcessFunction extends KeyedProcessFunction<String, Tuple2<String, Integer>, Event> {
private static final long serialVersionUID = 7289761960983988878L;
// delay after which an alert flag is thrown
private final long timeOut;
// state to remember the last timer set
private ValueState<List<Event>> listState = null;
private ValueState<Long> lastTime = null;
public SortKeyedProcessFunction(long timeOut) {
this.timeOut = timeOut;
}
@Override
public void open(Configuration conf) {
// setup timer and HLL state
ValueStateDescriptor<List<Event>> descriptor = new ValueStateDescriptor<>(
// state name
"sorted-events",
// type information of state
TypeInformation.of(new TypeHint<List<Event>>() {
}));
listState = getRuntimeContext().getState(descriptor);
ValueStateDescriptor<Long> descriptorLastTime = new ValueStateDescriptor<Long>(
"lastTime",
TypeInformation.of(new TypeHint<Long>() {
}));
lastTime = getRuntimeContext().getState(descriptorLastTime);
}
@Override
public void processElement(Tuple2<String, Integer> value, Context context, Collector<Event> collector) throws Exception {
// get current time and compute timeout time
long currentTime = context.timerService().currentProcessingTime();
long timeoutTime = currentTime + timeOut;
// register timer for timeout time
context.timerService().registerProcessingTimeTimer(timeoutTime);
List<Event> queue = listState.value();
if (queue == null) {
queue = new ArrayList<Event>();
}
Long current = lastTime.value();
queue.add(new Event(value.f0, value.f1));
lastTime.update(timeoutTime);
listState.update(queue);
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<Event> out) throws Exception {
// System.out.println("onTimer: " + timestamp);
// check if this was the last timer we registered
System.out.println("timestamp: " + timestamp);
List<Event> queue = listState.value();
Long current = lastTime.value();
if (timestamp == current.longValue()) {
Collections.sort(queue);
queue.forEach( e -> {
out.collect(e);
});
queue.clear();
listState.clear();
}
}
}
class Event implements Comparable<Event> {
String value;
Integer qtd;
public Event(String value, Integer qtd) {
this.value = value;
this.qtd = qtd;
}
public String getValue() { return value; }
public Integer getQtd() { return qtd; }
@Override
public String toString() {
return "Event{" +"value='" + value + ''' +", qtd=" + qtd +'}';
}
@Override
public int compareTo(@NotNull Event event) {
return this.getValue().compareTo(event.getValue());
}
}
因此,当我使用$ nc -lk 9000
并在控制台上键入单词时,我会在输出上按顺序看到它们
...
Event{value='soccer', qtd=7}
Event{value='swim', qtd=5}
...
Event{value='basketball', qtd=9}
Event{value='soccer', qtd=8}
Event{value='swim', qtd=6}
其他 UDF 用于流程序的其他转换,它们在这里是为了完整性。
public class SplitterFlatMap implements FlatMapFunction<String, Tuple2<String, Integer>> {
private static final long serialVersionUID = 3121588720675797629L;
@Override
public void flatMap(String sentence, Collector<Tuple2<String, Integer>> out) throws Exception {
for (String word : sentence.split(" ")) {
out.collect(Tuple2.of(word, 1));
}
}
}
public class WordKeySelector implements KeySelector<Tuple2<String, Integer>, String> {
@Override
public String getKey(Tuple2<String, Integer> value) throws Exception {
return value.f0;
}
}
public class SumReducer implements ReduceFunction<Tuple2<String, Integer>> {
@Override
public Tuple2<String, Integer> reduce(Tuple2<String, Integer> event1, Tuple2<String, Integer> event2) throws Exception {
return Tuple2.of(event1.f0, event1.f1 + event2.f1);
}
}
.sum(1)
方法将做你需要的一切(不需要使用apply()
),只要Splitter
类(应该是FlatMapFunction
)发出Tuple2<String, Integer>
记录,其中String
是单词,Integer
总是1
。
因此,.sum(1)
将为您进行聚合。如果您需要与sum()
不同的东西,您通常会使用.reduce(new MyCustomReduceFunction())
,因为就不需要缓冲内存中的大量数据而言,这将是最有效和可扩展的方法。