我正在尝试在Flink作业的窗口函数中使用HashMap。所有并行运算符的所有元素都可以存储在一个运算符的 HashMap 中吗?
public class SeewoUserWindowFunction implements WindowFunction<ObjectNode, LabelInfo, String, TimeWindow> {
private static final Logger logger = LoggerFactory.getLogger(SeewoUserWindowFunction.class);
@Override
public void apply(String s, TimeWindow timeWindow, Iterable<ObjectNode> iterable, Collector<LabelInfo> collector) throws Exception {
try {
HashMap<String, LabelInfo> result = new HashMap<>();
iterable.forEach(e -> {
String key = e.get("value").get("$tid").toString() + "/" + e.get("value").get("$code").toString();
if (result.containsKey(key)) {
result.put(key, result.get(key).update(e, timeWindow.getEnd()));
} else {
result.put(key, LabelInfo.of(e, timeWindow.getEnd()));
}
});
result.values().stream().forEach(labelInfo -> collector.collect(labelInfo));
} catch (Exception exception) {
logger.error("parse exception!", exception);
}
}
}
在您的情况下,每个并行运算符将简单地保留自己的HashMap
,但这在很大程度上取决于您流的分区。这里有类似的问题可以解释运营商之间的通信。 如果您出于某种原因希望有可能将流的所有元素保留在HashMap
并使用parallelism > 1
.您可以在数据流上调用global()
,这将导致流的所有元素仅通过并行运算符的一个实例,这基本上允许您将所有流元素存储在HashMap
中,但请记住,这可能会在吞吐量和延迟方面产生可怕的后果。
可以使用org.apache.flink.streaming.api.datastream.DataStream#windowAll
方法将所有元素收集到全局窗口中。
请参阅此文档。