我是Spark Streaming的新手。
我想监控和解压缩特定目录中的所有.zip文件。我已经引用了 http://cutler.io/2012/07/hadoop-processing-zip-files-in-mapreduce/并编写了以下代码
JavaPairInputDStream<Text, BytesWritable> streamlogFiles=ssc.fileStream(logDir, Text.class, BytesWritable.class, ZipFileInputFormat.class);
但是,我发现 fileStream() 不处理 zip 文件 exsitedin/move 到指定的目录。
有什么我想念的吗?
你可以在这里使用 ZipFileInputFormat: https://github.com/cotdp/com-cotdp-hadoop/tree/master/src/main/java/com/cotdp/hadoop
并使用创建文件流
val files = ssc.fileStream[Text, BytesWritable, ZipFileInputFormat](someInputDirectory)
files.foreachRDD{ rdd =>
rdd.foreachPartition { partition =>
partition.foreach { record =>
process(record._1.toString, record._2)
}
}
}
其中record._1.toString
是文件名,record._2
是该文件的字节可写。如果您不希望 InputFormat 解压缩.zip,则需要不同的自定义 FileInputFormat 或必须修改 ZipFileInputFormat。
要对此进行测试 - 请确保要添加到someInputDirectory
的.zip文件上次修改<1 分钟前,否则 SparkStreaming 将默认忽略它。