如何使用 ssc.filestream() 在 Java 中处理 zip 目录



我是Spark Streaming的新手。

我想监控和解压缩特定目录中的所有.zip文件。我已经引用了 http://cutler.io/2012/07/hadoop-processing-zip-files-in-mapreduce/并编写了以下代码

JavaPairInputDStream<Text, BytesWritable> streamlogFiles=ssc.fileStream(logDir, Text.class, BytesWritable.class, ZipFileInputFormat.class);

但是,我发现 fileStream() 不处理 zip 文件 exsitedin/move 到指定的目录。

有什么我想念的吗?

你可以在这里使用 ZipFileInputFormat: https://github.com/cotdp/com-cotdp-hadoop/tree/master/src/main/java/com/cotdp/hadoop

并使用创建文件流

val files = ssc.fileStream[Text, BytesWritable, ZipFileInputFormat](someInputDirectory)
files.foreachRDD{ rdd =>
  rdd.foreachPartition { partition =>
    partition.foreach { record =>
      process(record._1.toString, record._2)
    }
  }
}

其中record._1.toString是文件名,record._2是该文件的字节可写。如果您不希望 InputFormat 解压缩.zip,则需要不同的自定义 FileInputFormat 或必须修改 ZipFileInputFormat。

要对此进行测试 - 请确保要添加到someInputDirectory的.zip文件上次修改<1 分钟前,否则 SparkStreaming 将默认忽略它。

最新更新