即使目录中没有文件或数据,Spark也会保持从本地目录进行流式传输



我的代码有一个挑战。即使我的本地目录中没有数据或文件,spark也会继续流式传输。我正在本地系统上运行spark,尽管我仍然会部署到生产环境中。无论如何,我对spark都是新手。即使我添加了三个文件,spark也只选择了一个,它不会流式传输剩余的文件,但我担心的是持续的流式传输。请告诉我,在没有文件或数据的情况下,我如何控制和停止这种连续流媒体?我在java中使用spring-boot。下面是我的源代码:

@Component
public class FileConverter {
@Value("${output.path}")
protected String source;
@Autowired
SparkSessionConfig session;
public void processfileandconverttoexcel() throws TimeoutException, StreamingQueryException {
StreamingQuery query = null;
SparkSession sessionconfig = session.getSession();
System.out.println(sessionconfig.version());
// define schema type of file data source
sessionconfig.sparkContext().setLogLevel("ERROR");
StructType schema = new StructType().add("filedata", DataTypes.StringType, true);
// build the streaming data reader from the file source, specifying csv file
Dataset<Row> rawData = sessionconfig.readStream().format("csv").schema(schema)
.csv(source);
rawData.createOrReplaceTempView("filedata");
Dataset<Row> results = sessionconfig.sql("select * from filedata");
StreamingQuery query1 = rawData.writeStream().format("console").outputMode(OutputMode.Update()).start();
query1.awaitTermination();
System.out.println("end ofjob ..............");
}
*this test method when tried and called for streaming, is not even reading any files but continues to stream*
public void test() throws InterruptedException {
SparkConf conf = new SparkConf().setMaster("local[5]").setAppName("NetworkWordCount");
JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(1));
JavaDStream<String> lines = jssc.textFileStream(source);
lines.print();
lines.foreachRDD((rdd, time) -> {
;
if (!rdd.isEmpty()) {
lines.print();
rdd.foreach(f -> {
System.out.println(f);
});
}
});
jssc.start(); // Start the computation
jssc.awaitTermination(); // Wait for the computation to terminate
}
}
@Component
public class SparkSessionConfig {

public SparkSession getSession() {
System.out.println("creating spark session ,,,,,,,,,,,,,,, ");
System.setProperty("hadoop.home.dir", "C:/winutils");
SparkSession spark  = SparkSession.builder().appName("File Converter")
.config("spark.sql.dir.warehouse.dir","file:///c:/tmp/")
.master("local[5]")
.getOrCreate()  ;
System.out.println("spark session created ,,,,,,,,,,,,,,, ");
return spark;

}
}

这是我的控制台输出摘要

2021-01-13 03:09:42.711  INFO 12784 --- [4-7d949befed27]] o.a.s.s.e.datasources.InMemoryFileIndex  : It took 1 ms to list leaf files for 1 paths.
2021-01-13 03:09:42.723  INFO 12784 --- [4-7d949befed27]] o.a.s.s.e.datasources.InMemoryFileIndex  : It took 1 ms to list leaf files for 1 paths.
2021-01-13 03:09:42.734  INFO 12784 --- [4-7d949befed27]] o.a.s.s.e.datasources.InMemoryFileIndex  : It took 1 ms to list leaf files for 1 paths.
2021-01-13 03:09:42.736  INFO 12784 --- [4-7d949befed27]] o.a.s.s.e.streaming.MicroBatchExecution  : Streaming query made progress: {
"id" : "2445367b-fc80-416e-9ff5-0cddd1af7bb2",
"runId" : "1dc94f26-0bde-4447-b204-7d949befed27",
"name" : null,
"timestamp" : "2021-01-13T11:09:42.733Z",
"batchId" : 1,
"numInputRows" : 0,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 0.0,
"durationMs" : {
"latestOffset" : 2,
"triggerExecution" : 2
},
"stateOperators" : [ ],
"sources" : [ {
"description" : "FileStreamSource[file:/C:/Users/DELL/Desktop/sourcefolders]",
"startOffset" : {
"logOffset" : 0
},
"endOffset" : {
"logOffset" : 0
},
"numInputRows" : 0,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 0.0
} ],
"sink" : {
"description" : "org.apache.spark.sql.execution.streaming.ConsoleTable$@2ab718be",
"numOutputRows" : 0
}
}
2021-01-13 03:09:42.748  INFO 12784 --- [4-7d949befed27]] o.a.s.s.e.datasources.InMemoryFileIndex  : It took 1 ms to list leaf files for 1 paths.
2021-01-13 03:09:42.761  INFO 12784 --- [4-7d949befed27]] o.a.s.s.e.datasources.InMemoryFileIndex  : It took 1 ms to list leaf files for 1 paths.
2021-01-13 03:09:42.773  INFO 12784 --- [4-7d949befed27]] o.a.s.s.e.datasources.InMemoryFileIndex  : It took 1 ms to list leaf files for 1 paths.

请大家帮忙调查。

在导入触发器类后,尝试将触发器方法设置为Once

import org.apache.spark.sql.streaming.Trigger
StreamingQuery query1 = rawData.writeStream().format("console").trigger(Trigger.Once()).outputMode(OutputMode.Update()).start();
query1.awaitTermination();

最新更新