我有以下问题:我收到的消息必须分组,每组消息都必须处理。我只能检测到每个组的第一条消息。在该特定的第一条消息之后,以下消息属于该组,直到检测到下一组的第一条信息为止。
我解决这个问题的方法是编写一个自定义触发器,当他检测到组的第一条消息时(通过重写onElement(,该触发器会返回FIRE_PURGE。我的目标是将一个组的所有消息分配到一个窗口。
这种方法的问题是,每个组的第一条消息总是分配给前一组的窗口。
我得到的是:[aaaaaaaab],[bbbbbbbc]。。。我想要的是:[aaaaaa],[bbbbbbb]。。。
主要功能的相关代码:
esRawInputStream.filter(new FilterFunction<JsonNode>() {
@Override
public boolean filter(JsonNode doc) throws Exception {
return // some condition
}
}).keyBy(new KeySelector<JsonNode, String>() {
@Override
public String getKey(JsonNode doc) throws Exception {
return doc.findValue("meta_charge_point_id").asText();
}
}).window(GlobalWindows.create())
.trigger(new CustomEventTrigger<JsonNode, GlobalWindow>())
.fold(new SessionBucket(), new FoldFunction<JsonNode, SessionBucket>() {
@Override
public SessionBucket fold(SessionBucket b, JsonNode msg) throws Exception {
b.addMessage(msg);
return b;
}
}).addSink(new FileSink<SessionBucket>());
触发器:
public class CustomEventTrigger<T, W extends Window> extends Trigger {
private String currentSessionId = "foo";
@Override
public TriggerResult onElement(Object element, long timestamp, Window window, TriggerContext ctx) throws Exception {
JsonNode jsonElement = null;
if (element instanceof JsonNode) {
jsonElement = (JsonNode) element;
} else {
// raise
}
TriggerResult res = TriggerResult.CONTINUE;
String elementSessionId = jsonElement.findValue("ocpp_session_id").asText();
if (!elementSessionId.equals(currentSessionId)) {
currentSessionId = elementSessionId;
res = TriggerResult.FIRE_AND_PURGE;
}
return res;
}
@Override
public TriggerResult onProcessingTime(long time, Window window, TriggerContext ctx) throws Exception {
return null;
}
@Override
public TriggerResult onEventTime(long time, Window window, TriggerContext ctx) throws Exception {
return null;
}
@Override
public void clear(Window window, TriggerContext ctx) throws Exception {
}
}
这个用例不太适合Flink的window API。让我提出一个替代方案,那就是使用有状态的平面图函数来实现这一点。
下面是一个可能的例子:
public class Segmenting {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.fromElements(1, 2, 2, 3, 3, 3, 1, 4, 4, 4, 4, 2, 2)
// key the stream so we can used keyed state
.keyBy(event -> 1)
.flatMap(new RichFlatMapFunction<Integer, List<Integer>>() {
private transient ValueState<Integer> currentValue;
private transient ListState<Integer> list;
@Override
public void open(Configuration parameters) throws Exception {
currentValue = getRuntimeContext().getState(new ValueStateDescriptor<>("currentValue", Integer.class));
list = getRuntimeContext().getListState(new ListStateDescriptor<>("list", Integer.class));
}
@Override
public void flatMap(Integer event, Collector<List<Integer>> collector) throws Exception {
Integer value = currentValue.value();
if (value == event) {
list.add(event);
} else {
if (value != null) {
List<Integer> result = new ArrayList<>();
list.get().forEach(result::add);
collector.collect(result);
}
currentValue.update(event);
list.clear();
list.add(event);
}
}
})
.print();
env.execute();
}
}
输出为
[1]
[2, 2]
[3, 3, 3]
[1]
[4, 4, 4, 4]
顺便说一句,我假设数据是有序的,并避免并行处理以保持其有序。对于大多数流处理应用程序来说,这将是一个不切实际的假设。如果您的数据出现问题,您可以将其作为起点,但最终解决方案将更加复杂。