我有一个spark结构化的流媒体作业,它将从kafka获取数据,并使用forEachBatch将数据保存到Neo4j中,如下所示:
StreamingQuery query = eventsDf
.writeStream()
.queryName("streaming")
.outputMode("update")
.trigger(Trigger.ProcessingTime(80000))
.foreachBatch(
(VoidFunction2<Dataset<Row>, Long>) (dfBatch, batchId) -> {
}
)
.option("checkpointLocation", "src/main/resources/checkpoint")
.start();
eventsDf由聚合数据和筛选数据组成(此外,我没有时间戳列来使用窗口/水印(
第一批将有数据,80年代后,由于max
函数的聚合,将有第二批由前一批的数据组成我需要什么:
- 放弃以前的批处理数据
- 检查点位置的
State
文件夹中的数据大小持续增加State
文件夹由增量文件和检查点文件组成即使应用了minDeltasForSnapshot
配置,文件仍在持续增长
我尝试了不同的火花配置:
"spark.sql.streaming.minBatchesToRetain", 2
"spark.sql.streaming.stateStore.minDeltasForSnapshot", 2
"cleanSource", "delete"
"spark.sql.streaming.forceDeleteTempCheckpointLocation", true
更新:
我的代码中有groupby
和aggregation
,所以我使用.withColumn()
添加了batchId作为新列,然后执行groupby
和aggregation
。在这次更改之后,我的delta文件没有持续增长(能够解决这个问题(,但我的snapshot
文件正在持续增长。
如何防止这种情况发生?
我将伪数据存储在状态文件(即增量和快照文件(中,这将阻止我的状态增长(通过进行这些更改,引发结构化流作业将成为无状态作业(
以下代码片段位于forEachBatch
函数内部
dfBatch
.groupByKey((MapFunction<Row, String>) (row) -> row.getAs("id"), Encoders.STRING())
.flatMapGroupsWithState(mappingFunction, OutputMode.Update(), Encoders.bean(StateData.class), batchEncoder, ProcessingTimeTimeout())
ExpressionEncoder<Row> batchEncoder = RowEncoder.apply(batchSchema());
其中batchSchema((返回SchemaType
为数据类型的变量
映射功能:
private FlatMapGroupsWithStateFunction<String, Row, StateData, Row> mappingFunction = (key, value, state) -> {
if (state.hasTimedOut()) {
state.remove();
} else if (state.exists()) {
StateData existingState = state.get();
Map<String, Integer> data = existingState.getData();
int existingCount = data.getOrDefault(key, 0);
data.put(key, existingCount + 1);
existingState.setData(data);
state.update(existingState);
} else {
Map<String, Integer> data = new HashMap<>();
data.put(key, 1);
StateData newState = new StateData(data);
state.update(newState);
}
return value;
};
StateData
是我的映射类:
package com.spark;
import java.util.Map;
public class StateData {
Map<String, Integer> data;
public StateData(Map<String, Integer> data) {
this.data = data;
}
public Map<String, Integer> getData() {
return data;
}
public void setData(Map<String, Integer> data) {
this.data = data;
}
}