将一个较大的tar.gz文件分解为多个较小的tar.gz文件



在spark中处理大于1gb的tar.gz文件时,我得到OutOfMemoryError

为了克服这个错误,我尝试使用"split"命令将tar.gz拆分为多个部分,结果发现每个拆分都不是tar.gz,因此无法按原样处理。

dir=/dbfs/mnt/data/temp
b=524288000
for file in /dbfs/mnt/data/*.tar.gz; 
do 
a=$(stat -c%s "$file");
if [[ "$a" -gt "$b" ]] ; then 
split -b 500M -d --additional-suffix=.tar.gz $file "${file%%.*}_part"
mv $file $dir
fi
done

尝试处理分割文件时出错

Caused by: java.io.EOFException
at org.apache.commons.compress.compressors.gzip.GzipCompressorInputStream.read(GzipCompressorInputStream.java:281)
at java.io.BufferedInputStream.read1(BufferedInputStream.java:284)
at java.io.BufferedInputStream.read(BufferedInputStream.java:345)
at org.apache.commons.compress.archivers.tar.TarArchiveInputStream.read(TarArchiveInputStream.java:590)
at org.apache.commons.io.input.ProxyInputStream.read(ProxyInputStream.java:98)
at sun.nio.cs.StreamDecoder.readBytes(StreamDecoder.java:284)
at sun.nio.cs.StreamDecoder.implRead(StreamDecoder.java:326)
at sun.nio.cs.StreamDecoder.read(StreamDecoder.java:178)
at java.io.InputStreamReader.read(InputStreamReader.java:184)
at java.io.Reader.read(Reader.java:140)
at org.apache.commons.io.IOUtils.copyLarge(IOUtils.java:2001)
at org.apache.commons.io.IOUtils.copyLarge(IOUtils.java:1980)
at org.apache.commons.io.IOUtils.copy(IOUtils.java:1957)
at org.apache.commons.io.IOUtils.copy(IOUtils.java:1907)
at org.apache.commons.io.IOUtils.toString(IOUtils.java:778)
at org.apache.commons.io.IOUtils.toString(IOUtils.java:803)
at linea3796c25fa964697ba042965141ff28825.$read$$iw$$iw$$iw$$iw$$iw$$iw$Unpacker$$anonfun$apply$1.apply(command-2152765781429277:33)
at linea3796c25fa964697ba042965141ff28825.$read$$iw$$iw$$iw$$iw$$iw$$iw$Unpacker$$anonfun$apply$1.apply(command-2152765781429277:31)
at scala.collection.immutable.Stream$$anonfun$map$1.apply(Stream.scala:418)
at scala.collection.immutable.Stream$$anonfun$map$1.apply(Stream.scala:418)
at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1233)
at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1223)
at scala.collection.immutable.Stream.foreach(Stream.scala:595)
at scala.collection.TraversableOnce$class.toMap(TraversableOnce.scala:316)
at scala.collection.AbstractTraversable.toMap(Traversable.scala:104)
at linea3796c25fa964697ba042965141ff28825.$read$$iw$$iw$$iw$$iw$$iw$$iw$Unpacker$.apply(command-2152765781429277:34)
at linea3796c25fa964697ba042965141ff28827.$read$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1.apply(command-2152765781429278:3)
at linea3796c25fa964697ba042965141ff28827.$read$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1.apply(command-2152765781429278:3)

我有一个大小高达4gb的tar.gz文件,每个文件可以包含7000个大小从1mb到50mb不等的json文档。

如果我想把大的tar.gz文件分成更小的tar.gz文件,这是我唯一的选择,可以解压缩,然后根据文件大小或文件数量重新压缩?-"这是路吗">

普通的gzip文件是不可拆分的。GZip焦油档案更难处理。Spark可以处理gzipped json文件,但不能处理gzippedtar文件,也不能处理tar文件。Spark可以处理每个大约2GB的二进制文件。Spark可以处理连接在一起的JSON

我建议使用Pandas UDF或.pipe((运算符来处理每个tar gzippe文件(每个工作人员一个(。每个工作人员都会以流式方式解压缩、解压缩和处理每个JSON文档,而不会填充内存。希望您有足够的源文件来并行运行,并看到速度的提高。

您可能想探索流媒体方法,以将压缩的JSON文件增量交付给ADLS Gen 2/S3,并使用Databricks自动加载器功能在文件到达后立即加载和处理文件。

还有这个问题的答案如何在流数据集中加载tar.gz文件?看起来很有希望。

最新更新