在spark中处理超过3GB的记录大小



当单个记录大小超过3GB时,我会出现以下异常`

java.lang.IllegalArgumentException
App > at java.nio.CharBuffer.allocate(CharBuffer.java:330)
App > at java.nio.charset.CharsetDecoder.decode(CharsetDecoder.java:792)
App > at org.apache.hadoop.io.Text.decode(Text.java:412)
App > at org.apache.hadoop.io.Text.decode(Text.java:389)
App > at org.apache.hadoop.io.Text.toString(Text.java:280)
App > at org.apache.spark.sql.execution.datasources.json.JsonFileFormat$$anonfun$createBaseRdd$1.apply(JsonFileFormat.scala:135)
App > at org.apache.spark.sql.execution.datasources.json.JsonFileFormat$$anonfun$createBaseRdd$1.apply(JsonFileFormat.scala:135)

如何增加单个记录的缓冲区大小?

您的文件中可能有一行包含数组。在这里,您会遇到一个异常,因为您正试图构建一个太大的CharBuffer(很可能是一个在出界后变为负数的整数)。java中的最大数组/字符串大小为2^31-1(Integer.MAX_VALUE-1)(请参阅本线程)。你说你有一个3GB的记录,每个字符有1B,它包含30亿个字符,超过2^31,大约等于20亿。

TW你可以做的有点麻烦,但由于你只有一个大数组的键,所以它可能会起作用。您的json文件可能看起来像:

{
"key" : ["v0", "v1", "v2"... ]
}

或者像这样,但我认为在你的情况下是前者:

{
"key" : [
"v0", 
"v1", 
"v2",
... 
]
}

因此,您可以尝试将hadoop使用的行分隔符更改为",",如下所示。基本上,他们是这样做的:

import org.apache.hadoop.io.LongWritable
import org.apache.hadoop.io.Text
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
def nlFile(path: String) = {
val conf = new Configuration
conf.set("textinputformat.record.delimiter", ",")
sc.newAPIHadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text], conf)
.map(_._2.toString)
}

然后你可以读取你的数组,只需要自己用这样的东西删除JSON括号:

nlFile("...")
.map(_.replaceAll("^.*\[", "").replaceAll("\].*$",""))

请注意,如果您的记录可以包含字符"["one_answers"]",则您必须更加小心,但以下是想法。

相关内容

  • 没有找到相关文章

最新更新