如何将Scala HashMap转换为Java HashMap以便序列化到磁盘



我有一个用Spark数据帧创建的Scala HashMap。我想将它转换为Java HashMap,并将其写入磁盘。稍后,我打算在生产环境中加载JavaHashMap,并在非火花环境中使用它。

到目前为止,我已经能够将Spark数据帧转换为Scala HashMap,如下所示:

val mydf1 = Seq((1, "a"), (2, "b"),(3, "c"),(4, "d"),(5, "e")).toDF("id", "col2")
mydf1.show
+---+----+
| id|col2|
+---+----+
|  1|   a|
|  2|   b|
|  3|   c|
|  4|   d|
|  5|   e|
+---+----+
val mydfHash = mydf1.rdd.map{
case Row(routeItemKey: String, kwrExpectedScore: Double) => (routeItemKey, kwrExpectedScore)}.collectAsMap()

然而,当我尝试将上面的Scala HashMap转换为Java HashMap时,如下所示:

import java.util._
import scala.collection.JavaConverters._

mydfHash.asJava

我收到一个java.lang.OutOfMemoryError: Java heap space错误。

以下是我得到的堆栈跟踪日志供参考:

java.lang.OutOfMemoryError: Java heap space
at java.util.Arrays.copyOf(Arrays.java:3332)
at java.lang.AbstractStringBuilder.ensureCapacityInternal(AbstractStringBuilder.java:124)
at java.lang.AbstractStringBuilder.append(AbstractStringBuilder.java:448)
at java.lang.StringBuilder.append(StringBuilder.java:136)
at java.lang.StringBuilder.append(StringBuilder.java:131)
at java.util.AbstractMap.toString(AbstractMap.java:559)
at scala.runtime.ScalaRunTime$.scala$runtime$ScalaRunTime$$inner$1(ScalaRunTime.scala:332)
at scala.runtime.ScalaRunTime$.stringOf(ScalaRunTime.scala:337)
at scala.runtime.ScalaRunTime$.replStringOf(ScalaRunTime.scala:345)
at .$print$lzycompute(<console>:10)
at .$print(<console>:6)
at $print(<console>)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at scala.tools.nsc.interpreter.IMain$ReadEvalPrint.call(IMain.scala:786)
at scala.tools.nsc.interpreter.IMain$Request.loadAndRun(IMain.scala:1047)
at scala.tools.nsc.interpreter.IMain$WrappedRequest$$anonfun$loadAndRunReq$1.apply(IMain.scala:638)
at scala.tools.nsc.interpreter.IMain$WrappedRequest$$anonfun$loadAndRunReq$1.apply(IMain.scala:637)
at scala.reflect.internal.util.ScalaClassLoader$class.asContext(ScalaClassLoader.scala:31)
at scala.reflect.internal.util.AbstractFileClassLoader.asContext(AbstractFileClassLoader.scala:19)
at scala.tools.nsc.interpreter.IMain$WrappedRequest.loadAndRunReq(IMain.scala:637)
at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:569)
at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:565)
at org.apache.zeppelin.spark.SparkScala211Interpreter.scalaInterpret(SparkScala211Interpreter.scala:143)
at org.apache.zeppelin.spark.SparkScala211Interpreter$$anonfun$interpret$1$$anonfun$apply$2.apply(SparkScala211Interpreter.scala:122)
at org.apache.zeppelin.spark.SparkScala211Interpreter$$anonfun$interpret$1$$anonfun$apply$2.apply(SparkScala211Interpreter.scala:116)
at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732)

为什么collectAsMap()工作,而asJava失败?我以为collectAsMap也会收集每个RDD到Spark主节点。因此,如果collectAsMap没有失败,那么理想情况下asJava也不应该因为堆内存不足而失败。

更新1

我真的需要把Scala哈希映射转换成Java哈希映射吗?在Java环境中,是否不可能将Scala HashMap序列化导出到文件中,并将此Scala HashMap加载到Java HashMap中?因为Scala和Java都在JVM中运行。

为什么collectAsMap()工作但asJava失败?

根据我的理解,以下是理由:

当您使用asJava时,它在内部使用StringBuilder来创建一个数组,而StringBuilder试图构建一个大于Integer.MAX_VALUE的数组(不能分配一个包含多个Integer.MAX.VALUE元素的数组)。使用StringBuilder,您可以累积1207959550个字符,远远低于Integer.MAX_VALUE.

希望这能有所帮助。

最新更新