如何读取压缩的 Spark 事件日志?



当我尝试读取用lz4压缩的Spark 2.4.4事件日志时,我得到了一个空的数据帧:

cd /opt/spark-2.4.4-bin-hadoop2.7
bin/spark-shell --master=local --conf spark.eventLog.enabled=true --conf spark.eventLog.compress=true --conf spark.io.compression.codec=lz4 --driver-memory 4G --driver-library-path=/opt/hadoop-2.7.1/lib/native/
// Trying to read an event log from a previous session
spark.read.option("compression", "lz4").json(s"file:///tmp/spark-events/local-1589202668377.lz4")
// res0: org.apache.spark.sql.DataFrame = []                                       

但是,当我读取未压缩的事件日志时,它工作正常:

bin/spark-shell --master=local --conf spark.eventLog.enabled=true --conf spark.eventLog.compress=false
spark.read.json(s"file:///tmp/spark-events/${sc.applicationId}.inprogress").printSchema
//root
// |-- App ID: string (nullable = true)
// |-- App Name: string (nullable = true)
// |-- Block Manager ID: struct (nullable = true)
// |    |-- Executor ID: string (nullable = true)

我还尝试读取一个用活泼、相同的结果压缩的事件日志。

尝试做

spark.read.json("dbfs:/tmp/compress/part-00000.lz4")
spark.conf.set("spark.io.compression.codec","org.apache.spark.io.LZ4CompressionCodec")

如果它不起作用,那么您的 lz4 很可能与org.apache.hadoop.io.compress.Lz4Codec不兼容 以下是相同的未解决的问题链接 操作系统和Hadoop之间的lz4不兼容

我实际上可以先解压缩文件,使用 Spark 编解码器,然后读取解压缩的文件:

import java.io.{FileInputStream, FileOutputStream}
import org.apache.commons.io.IOUtils
import org.apache.spark.io.LZ4CompressionCodec
val inFile = "/tmp/spark-events/local-1589202668377.lz4"
val outFile = "/tmp/spark-events/local-1589202668377" 
val codec = new LZ4CompressionCodec(sc.getConf)
val is = codec.compressedInputStream(new FileInputStream(inFile))
val os = new FileOutputStream(outFile)
IOUtils.copyLarge(is, os)
os.close()
is.close()
spark.read.json(outFile).printSchema

如果我可以在不使用临时存储的情况下直接读取inFile会更好,但至少我可以访问数据。

最新更新