如何在不使用窗口的情况下在Apache的Flink中拆分和合并数据(向量)



我需要将整数立方体拆分为向量,对每个向量执行一些操作(一个简单的加法(,然后将向量合并回立方体。向量操作应并行执行(即每个流一个向量(。多维数据集是包含 ID 的对象。

我可以将立方体拆分为向量并使用立方体 ID 创建一个元组,然后使用 keyBy(id(,并为每个立方体的向量创建一个分区。但是,似乎我必须使用某个时间单位的窗口来执行此操作。该应用程序对延迟非常敏感,因此我更愿意在向量到达时组合它们,也许使用某种逻辑时钟(我知道一个立方体中有多少个向量(,当最后一个向量到达时,将重新组装的多维数据集发送到下游。 这在 Flink 中可能吗?

下面是一个代码片段来举例说明这个想法:

//Stream topology..
final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Cube> stream = env
//Take cubes from collection and send downstream
.fromCollection(cubes)
//Split the cube(int[][][]) to vectors(int[]) and send downstream
.flatMap(new VSplitter()) //returns tuple with id at pos 1
.keyBy(1)
//For each value in each vector element, add its value with one.
.map(new MapFunction<Tuple2<CubeVector, Integer>, Tuple2<CubeVector, Integer>>() {
@Override
public Tuple2<CubeVector, Integer> map(Tuple2<CubeVector, Integer> cVec) throws Exception {
CubeVector cv = cVec.getField(0);
cv.cubeVectorAdd(1);
cVec.setField(cv, 0);
return cVec;
}
})
//** Merge vectors back to a cube **//
.
.
.
//The cube splitter to vectors..
public static class VSplitter implements FlatMapFunction<Cube, Tuple2<CubeVector, Integer>> {
@Override
public void flatMap(Cube cube, Collector<Tuple2<CubeVector, Integer>> out) throws Exception {
for (CubeVector cv : cubeVSplit(cube)) {
//out.assignTimestamp()
out.collect(new Tuple2<CubeVector, Integer>(cv, cube.getId()));
}
}
}

您可以使用一个FlatMapFunction,它不断附加CubeVectors,直到它看到足够的CubeVectors来重建Cube。以下代码片段应该可以解决问题:

DataStream<Tuple2<CubeVector, Integer> input = ...
input.keyBy(1).flatMap(
new RichFlatMapFunction<Tuple2<CubeVector, Integer>, Cube> {
private static final ListStateDescriptor<CubeVector> cubeVectorsStateDescriptor = new ListStateDescriptor<CubeVector>(
"cubeVectors",
new CubeVectorTypeInformation());
private static final ValueStateDescriptor<Integer> cubeVectorCounterDescriptor = new ValueStateDescriptor<>(
"cubeVectorCounter",
BasicTypeInfo.INT_TYPE_INFO);
private ListState<CubeVector> cubeVectors;
private ValueState<Integer> cubeVectorCounter;
@Override
public void open(Configuration parameters) {
cubeVectors = getRuntimeContext().getListState(cubeVectorsStateDescriptor);
cubeVectorCounter = getRuntimeContext().getState(cubeVectorCounterDescriptor);
}
@Override
public void flatMap(Tuple2<CubeVector, Integer> cubeVectorIntegerTuple2, Collector<Cube> collector) throws Exception {
cubeVectors.add(cubeVectorIntegerTuple2.f0);
final int oldCounterValue = cubeVectorCounter.value();
final int newCounterValue = oldCounterValue + 1;
if (newCounterValue == NUMBER_CUBE_VECTORS) {
Cube cube = createCube(cubeVectors.get());
cubeVectors.clear();
cubeVectorCounter.update(0);
collector.collect(cube);
} else {
cubeVectorCounter.update(newCounterValue);
}
}
});

最新更新