如何仅从文件中处理新记录?



我有一个处理文件中记录的方案。文件中的数据会定期添加(每毫秒(。所以我需要读取文件并处理它,同时只处理新添加的记录。

我遇到了基于Spark SQL构建的Spark Structured Streaming的概念。我正在做的是——

  1. 每 1 秒触发一次文件流处理
  2. 对文件运行 Spark SQL 查询
  3. 以追加模式在控制台上写入查询的输出。

下面是相同的代码 -

public static class SparkStreamer implements Runnable,Serializable {
@Override
public void run() {
processDataStream();
}
private void processDataStream() {
Dataset<Row> rowData = spark.readStream().format("Text").load("C:\Test\App\");
Dataset<String> data = rowData.as(Encoders.STRING()).flatMap(new FlatMapFunction<String, String>() {
@Override
public Iterator<String> call(String row) throws Exception {
return Arrays.asList(row.split("\|")).iterator();
}

},Encoders.STRING());
Dataset<Row> dataCount = data.select(new Column("value"));

StreamingQuery query = dataCount.writeStream()
.outputMode("append")
.format("console")
.start();
try {
query.awaitTermination();
} catch (StreamingQueryException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}

使用上述实现,查询执行 1 次,但是如果我在文件中添加新记录,则不会触发第二次批处理执行。

其他观察结果:

  • 输出模式完成并更新时,没有输出。仅在附加模式下,我才能获得 1 次输出。

有人可以帮助解决这个问题吗?Spark 结构化流式处理是否支持处理来自文件的数据,因为普通 Spark 流式处理不支持。

Spark 结构化流式处理是否支持处理文件中的数据

是的。

查询执行 1 次,但如果我在文件中添加新记录,则不会触发第二次批处理执行。

一旦文件被标记为已查看并且再也不会处理,这将无法正常工作(查看负责它的 FileStreamSource 以查找它在幕后的工作方式(。

建议的解决方案是将新内容写入新文件。

最新更新