对于我们的一个用例,我们需要根据文件中的更改重做一些计算,然后广播该文件的结果,以便我们可以在其他流中使用它。
程序的生命周期,几乎是这样的:
数据流 1:监控文件 ->检测一些变化 ->重新处理文件中的所有元素 ->计算一个结果 ->广播
数据流 2:一些转换 -> DS2 中的每个元素做一些事情,同时使用所有现有的广播元素(在广播元素中可以容忍一段时间的一些数据丢失)
我将给出一些代码示例来更好地解释问题所在:
所以这是DS1: 映射每个元素,将它们发送到化简器,然后计算总计
env.readFile(format, clientPath, FileProcessingMode.PROCESS_CONTINUOUSLY, interval)
.map(new Adder())
.keyBy(Map::size)
.reduce(new Reducer());
这是映射阶段,它只是从一行创建一个哈希图
public static class Adder extends RichMapFunction<String, Map<String, String>> {
private static final long serialVersionUID = 1L;
@Override
public Map<String, String> map(String string) throws Exception {
String[] strings = string.split("=");
HashMap<String, String> hashMap = new HashMap<>();
hashMap.put(strings[0], strings[1]);
return hashMap;
}
}
这是最后一步,减速器。获取来自映射器的所有简化元素,然后返回总计,单个哈希映射
public static class Reducer extends RichReduceFunction<Map<String, String>> {
private static final long serialVersionUID = 1L;
@Override
public Map<String, String> reduce(Map<String, String> stringStringMap, Map<String, String> t1) throws Exception {
stringStringMap.putAll(t1);
return stringStringMap;
}
}
然后 DS1 像下面的代码段一样广播。
MapStateDescriptor<String, String> descriptor = new MapStateDescriptor<>("Brodcasted map state", Types.STRING, Types.STRING);
BroadcastStream<Map<String, String>> broadcastedProperties = clientProperties.broadcast(descriptor);
ds2.connect(broadcastedProperties).process(new EventListener(properties));
在给定时间内使用以下元素
Time Document
T1 K1=V1, K2=V2
T2 K2=V2
T3 K3=V3, K1=V4
当我运行我们的程序时,我所期望的是:
Time Broadcasted Elements
T1 K1=V1, K2=V2
T2 K2=V2
T3 K3=V3, K1=V4
我看到的是这样的:
Time Broadcasted Elements
T1 K1=V1, K2=V2
T2 K1=V1, K2=V2
T3 K1=V4, K2=V2, K3=V3
为了克服这个问题,我所做的只是简单地在数据流上取一个窗口,并使用带有累加器而不是化简器的聚合函数,但我更愿意采用非窗口方法。
我做了一些调试,我意识到的是,即使在映射阶段它只映射该时间内的可用元素,但在减少阶段,它正在根据先前的状态进行缩减(我的意思是时间的结果 – 1)+ 该点的所有元素。我觉得在减少阶段有一个不可见的状态很奇怪。从我的角度来看,它应该只基于直接来自映射器的元素。也许我对 Flink 中 reduce 的理解是错误的,但我很想得到一些澄清。
是的,当 Flink 的任何内置聚合器(例如 sum、max、reduce 等)应用于流时,它会以增量、有状态的方式聚合整个流。或者更准确地说,这是在KeyedStreams上完成的,聚合是逐个键完成的,但以一种持续的、无界的方式。例如,如果您在整数 1、2、3、4、5、...然后 sum() 将产生流 1, 3, 6, 10, 15, ...在您的情况下,reduce() 将生成一个不断更新的流,其中包含越来越多的键/值对。
如果您要按时间键控流,那么您应该得到所需的结果,但键控状态仍将永远保持,这可能会有问题。我建议你要么使用window API,要么使用RichFlatMap或ProcessFunction之类的东西,在那里你可以直接管理状态。
没有窗口的Reduce函数将是一个滚动的reduce。如果要在滚动缩减之间保持一致的状态,请使用状态对象来保存状态,稍后检索并更新它。我认为这也是@David Anderson对RichReduceFunction的建议。
public static class Reducer extends RichReduceFunction<Map<String, String>> {
private static final long serialVersionUID = 1L;
private final MapStateDescriptor<String, String> mapStateDesc = new MapStateDescriptor<>("myMapState", BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
@Override
public void open(Configuration parameters) {
getRuntimeContext().getMapState(this.mapStateDesc);
}
@Override
public Map<String, String> reduce(Map<String, String> stringStringMap, Map<String, String> t1) throws Exception {
MapState<String, String> myMapState = getRuntimeContext().getMapState(this.mapStateDesc);
HashMap<String, String> newMap = new HashMap<>();
//updating your map from previous state
for(Map.Entry<String,String> entry : myMapState.entries()) {
newMap.put(entry.getKey(),entry.getValue());
}
newMap.putAll(stringStringMap);
newMap.putAll(t1);
//update the state with latest data set
myMapState.putAll(newMap);
return newMap;
}