如何从Spark StreamingContext的DStream中获取文件名



Event经过多次尝试和谷歌搜索,如果我使用流上下文,则无法获得fileName。我可以使用SparkContext的完整TextFiles,但之后我必须重新实现流上下文的功能。

注意:FileName(错误事件为json文件)是系统的输入,因此在输出中保留名称非常重要,以便在审计过程中跟踪任何事件。

注意:FileName的格式如下。SerialNumber部分可以从事件json中提取,但时间存储为毫秒,很难以可靠的方式获得以下格式,也无法找到计数器。…

每个文件只包含一行复杂的json字符串。使用流上下文,我可以创建一个RDD[String],其中每个字符串都是来自单个文件的json字符串。任何人都可以有任何解决方案/解决方法来将字符串与相应的文件名相关联吗。

val sc = new SparkContext("local[*]", "test")
val ssc = new StreamingContext(sc, Seconds(4))
val dStream = ssc.textFileStream(pathOfDirToStream)
dStream.foreachRDD { eventsRdd => /* How to get the file name */ }

您可以使用fileStream并创建自己的FileInputFormat来完成此操作,类似于使用InputSplit提供文件名作为Key的TextInputFormat。然后,您可以使用fileStream获得一个带有文件名和行的DStream。

Hi为了从DStream中获取文件名,我创建了一个java函数,该函数使用java spark api获取文件路径,而在spark streaming(用scala编写)中,我调用了该函数。下面是一个java代码示例:

import java.io.Serializable;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.rdd.NewHadoopPartition;
import org.apache.spark.rdd.UnionPartition;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.Partition;
public class GetFileNameFromStream implements Serializable{

   public String getFileName(Partition partition)
   {
       UnionPartition upp = (UnionPartition)partition;
       NewHadoopPartition npp = (NewHadoopPartition) upp.parentPartition();
       String filePath=npp.serializableHadoopSplit().value().toString();
      return filePath;
    }
 }

在spark streaming中,我调用了上面的java函数这是的代码示例

val obj =new GetFileNameFromStream
dstream.transform(rdd=>{
   val lenPartition = rdd.partitions.length
   val listPartitions = rdd.partitions
   for(part <-listPartitions){
    var filePath=obj.getFileName(part)
 })

相关内容

  • 没有找到相关文章

最新更新