如何在 cassandra 表中查询时解决 scala 代码中的内存不足错误



我在 cassandra 表中有数百万条记录,想要获取所有分区键列值。我正在收到java.lang.OutOfMemoryError:Java堆空间错误。请参阅我的示例代码。

 val rowKeyRdd: Array[CassandraRow] =
    sc.cassandraTable(keyspace, table).select("customer_id", "uniqueaddress").collect()
 val clientPartitionKeys = rowKeyRdd.map(x => ClientPartitionKey(
    x.getString("customer_id"), x.getString("uniqueaddress"))).toList
 val clientRdd: RDD[CassandraRow] =
    sc.parallelize(clientPartitionKeys).joinWithCassandraTable(keyspace, table)
      .where("eventtime >= ?", startDate)
      .where("eventtime <= ?", endDate)
      .map(x => x._2)
    clientRdd.cache()

我的目标是获取给定日期范围内的所有行。所以我应用了这样的逻辑:首先从表中获取所有分区键,然后使用分区键列表获取日期范围之间的所有记录。

但在这种情况下,我在执行 scala 代码期间出现以下错误:

 ERROR 2016-02-11 13:05:54 org.apache.spark.util.Utils: Uncaught exception in thread task-result-getter-1
java.lang.OutOfMemoryError: Java heap space
at java.io.ObjectInputStream$HandleTable.grow(ObjectInputStream.java:3476) ~[na:1.8.0_66]
at java.io.ObjectInputStream$HandleTable.assign(ObjectInputStream.java:3282) ~[na:1.8.0_66]
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1792) ~[na:1.8.0_66]
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) ~[na:1.8.0_66]
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000) ~[na:1.8.0_66]
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924) ~[na:1.8.0_66]
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801) ~[na:1.8.0_66]
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) ~[na:1.8.0_66]
at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1707) ~[na:1.8.0_66]
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1345) ~[na:1.8.0_66]
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371) ~[na:1.8.0_66]
at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:69) ~[spark-core_2.10-1.4.2.2.jar:1.4.2.2]
at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:89) ~[spark-core_2.10-1.4.2.2.jar:1.4.2.2]
at org.apache.spark.scheduler.DirectTaskResult.value(TaskResult.scala:95) ~[spark-core_2.10-1.4.2.2.jar:1.4.2.2]
at org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply$mcV$sp(TaskResultGetter.scala:60) ~[spark-core_2.10-1.4.2.2.jar:1.4.2.2]
at org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:51) ~[spark-core_2.10-1.4.2.2.jar:1.4.2.2]
at org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:51) ~[spark-core_2.10-1.4.2.2.jar:1.4.2.2]
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1652) ~[spark-core_2.10-1.4.2.2.jar:1.4.2.2]
at org.apache.spark.scheduler.TaskResultGetter$$anon$2.run(TaskResultGetter.scala:50) [spark-core_2.10-1.4.2.2.jar:1.4.2.2]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [na:1.8.0_66]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [na:1.8.0_66]
at java.lang.Thread.run(Thread.java:745) [na:1.8.0_66]
Exception in thread "task-result-getter-1" java.lang.OutOfMemoryError: Java heap space

请在上面的代码中向我建议任何解决方案。

与其在代码中解决此问题,不如尝试通过将-Xmx8g(或所需的任何数量的 RAM)传递给 JVM 启动配置来增加应用程序可用的堆空间。

最新更新