如何丢弃存储在检查点目录的状态文件夹中的上一批聚合数据



我有一个spark结构化的流媒体作业,它将从kafka获取数据,并使用forEachBatch将数据保存到Neo4j中,如下所示:

StreamingQuery query = eventsDf
.writeStream()
.queryName("streaming")
.outputMode("update")
.trigger(Trigger.ProcessingTime(80000))
.foreachBatch(
(VoidFunction2<Dataset<Row>, Long>) (dfBatch, batchId) -> {
}
)
.option("checkpointLocation", "src/main/resources/checkpoint")
.start();

eventsDf由聚合数据和筛选数据组成(此外,我没有时间戳列来使用窗口/水印(

第一批将有数据,80年代后,由于max函数的聚合,将有第二批由前一批的数据组成我需要什么:

  1. 放弃以前的批处理数据
  2. 检查点位置的State文件夹中的数据大小持续增加State文件夹由增量文件和检查点文件组成即使应用了minDeltasForSnapshot配置,文件仍在持续增长

我尝试了不同的火花配置:

"spark.sql.streaming.minBatchesToRetain", 2
"spark.sql.streaming.stateStore.minDeltasForSnapshot", 2
"cleanSource", "delete"
"spark.sql.streaming.forceDeleteTempCheckpointLocation", true

更新

我的代码中有groupbyaggregation,所以我使用.withColumn()添加了batchId作为新列,然后执行groupbyaggregation。在这次更改之后,我的delta文件没有持续增长(能够解决这个问题(,但我的snapshot文件正在持续增长。

如何防止这种情况发生?

我将伪数据存储在状态文件(即增量和快照文件(中,这将阻止我的状态增长(通过进行这些更改,引发结构化流作业将成为无状态作业(

以下代码片段位于forEachBatch函数内部

dfBatch
.groupByKey((MapFunction<Row, String>) (row) -> row.getAs("id"), Encoders.STRING())
.flatMapGroupsWithState(mappingFunction, OutputMode.Update(), Encoders.bean(StateData.class), batchEncoder, ProcessingTimeTimeout())

ExpressionEncoder<Row> batchEncoder = RowEncoder.apply(batchSchema());其中batchSchema((返回SchemaType为数据类型的变量

映射功能:

private FlatMapGroupsWithStateFunction<String, Row, StateData, Row> mappingFunction = (key, value, state) -> {
if (state.hasTimedOut()) {
state.remove();
} else if (state.exists()) {
StateData existingState = state.get();
Map<String, Integer> data = existingState.getData();
int existingCount = data.getOrDefault(key, 0);
data.put(key, existingCount + 1);
existingState.setData(data);
state.update(existingState);
} else {
Map<String, Integer> data = new HashMap<>();
data.put(key, 1);
StateData newState = new StateData(data);
state.update(newState);
}
return value;
};

StateData是我的映射类:

package com.spark;
import java.util.Map;
public class StateData {
Map<String, Integer> data;
public StateData(Map<String, Integer> data) {
this.data = data;
}
public Map<String, Integer> getData() {
return data;
}
public void setData(Map<String, Integer> data) {
this.data = data;
}
}

最新更新