Event经过多次尝试和谷歌搜索,如果我使用流上下文,则无法获得fileName。我可以使用SparkContext的完整TextFiles,但之后我必须重新实现流上下文的功能。
注意:FileName(错误事件为json文件)是系统的输入,因此在输出中保留名称非常重要,以便在审计过程中跟踪任何事件。
注意:FileName的格式如下。SerialNumber部分可以从事件json中提取,但时间存储为毫秒,很难以可靠的方式获得以下格式,也无法找到计数器。…
每个文件只包含一行复杂的json字符串。使用流上下文,我可以创建一个RDD[String],其中每个字符串都是来自单个文件的json字符串。任何人都可以有任何解决方案/解决方法来将字符串与相应的文件名相关联吗。
val sc = new SparkContext("local[*]", "test")
val ssc = new StreamingContext(sc, Seconds(4))
val dStream = ssc.textFileStream(pathOfDirToStream)
dStream.foreachRDD { eventsRdd => /* How to get the file name */ }
您可以使用fileStream并创建自己的FileInputFormat来完成此操作,类似于使用InputSplit提供文件名作为Key的TextInputFormat。然后,您可以使用fileStream获得一个带有文件名和行的DStream。
Hi为了从DStream中获取文件名,我创建了一个java函数,该函数使用java spark api获取文件路径,而在spark streaming(用scala编写)中,我调用了该函数。下面是一个java代码示例:
import java.io.Serializable;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.rdd.NewHadoopPartition;
import org.apache.spark.rdd.UnionPartition;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.Partition;
public class GetFileNameFromStream implements Serializable{
public String getFileName(Partition partition)
{
UnionPartition upp = (UnionPartition)partition;
NewHadoopPartition npp = (NewHadoopPartition) upp.parentPartition();
String filePath=npp.serializableHadoopSplit().value().toString();
return filePath;
}
}
在spark streaming中,我调用了上面的java函数这是的代码示例
val obj =new GetFileNameFromStream
dstream.transform(rdd=>{
val lenPartition = rdd.partitions.length
val listPartitions = rdd.partitions
for(part <-listPartitions){
var filePath=obj.getFileName(part)
})