Apache Spark with Cassandra behavior



我正在编写一个独立的Spark程序,从Cassandra获取数据。我遵循示例并通过newAPIHadoopRDD()和ColumnFamilyInputFormat类创建RDD。RDD被创建,但是当我调用RDD的.groupByKey()方法时,我得到一个NotSerializableException:

public static void main(String[] args) {
    SparkConf sparkConf = new SparkConf();
    sparkConf.setMaster("local").setAppName("Test");
    JavaSparkContext ctx = new JavaSparkContext(sparkConf);
    Job job = new Job();
    Configuration jobConf = job.getConfiguration();
    job.setInputFormatClass(ColumnFamilyInputFormat.class);
    ConfigHelper.setInputInitialAddress(jobConf, host);
    ConfigHelper.setInputRpcPort(jobConf, port);
    ConfigHelper.setOutputInitialAddress(jobConf, host);
    ConfigHelper.setOutputRpcPort(jobConf, port);
    ConfigHelper.setInputColumnFamily(jobConf, keySpace, columnFamily, true);
    ConfigHelper.setInputPartitioner(jobConf,"Murmur3Partitioner");
    ConfigHelper.setOutputPartitioner(jobConf,"Murmur3Partitioner");
    SlicePredicate predicate = new SlicePredicate();
    SliceRange sliceRange = new SliceRange();
    sliceRange.setFinish(new byte[0]);
    sliceRange.setStart(new byte[0]);
    predicate.setSlice_range(sliceRange);
    ConfigHelper.setInputSlicePredicate(jobConf, predicate);
    JavaPairRDD<ByteBuffer, SortedMap<ByteBuffer, IColumn>> rdd =
    spark.newAPIHadoopRDD(jobConf,
    ColumnFamilyInputFormat.class.asSubclass(org.apache.hadoop.mapreduce.InputFormat.class),
    ByteBuffer.class, SortedMap.class);
    JavaPairRDD<ByteBuffer, Iterable<SortedMap<ByteBuffer, IColumn>>> groupRdd = rdd.groupByKey();
    System.out.println(groupRdd.count());
}

例外:

. io .NotSerializableException: java.nio.HeapByteBufferjava.io.ObjectOutputStream.writeObject0 (ObjectOutputStream.java: 1164)java.io.ObjectOutputStream.defaultWriteFields (ObjectOutputStream.java: 1518)java.io.ObjectOutputStream.writeSerialData (ObjectOutputStream.java: 1483)java.io.ObjectOutputStream.writeOrdinaryObject (ObjectOutputStream.java: 1400)java.io.ObjectOutputStream.writeObject0 (ObjectOutputStream.java: 1158)java.io.ObjectOutputStream.writeObject (ObjectOutputStream.java: 330)在org.apache.spark.serializer.JavaSerializationStream.writeObject (JavaSerializer.scala: 42)org.apache.spark.storage.DiskBlockObjectWriter.write (BlockObjectWriter.scala: 179)在org.apache.spark.scheduler.ShuffleMapTask anonfun runTask美元1.美元申请(ShuffleMapTask.scala: 161)在org.apache.spark.scheduler.ShuffleMapTask anonfun runTask美元1.美元申请(ShuffleMapTask.scala: 158)scala.collection.Iterator class.foreach美元(Iterator.scala: 727)在org.apache.spark.InterruptibleIterator.foreach (InterruptibleIterator.scala: 28)org.apache.spark.scheduler.ShuffleMapTask.runTask (ShuffleMapTask.scala: 158)org.apache.spark.scheduler.ShuffleMapTask.runTask (ShuffleMapTask.scala: 99)在org.apache.spark.scheduler.Task.run (Task.scala: 51)org.apache.spark.executor.Executor TaskRunner.run美元(Executor.scala: 187)java.util.concurrent.ThreadPoolExecutor Worker.runTask美元(ThreadPoolExecutor.java: 895)java.util.concurrent.ThreadPoolExecutor Worker.run美元(ThreadPoolExecutor.java: 918)java.lang.Thread.run (Thread.java: 662)

我要做的是合并所有行键列到一个单一的条目。当我尝试像这样使用reduceByKey()方法时,我也会得到同样的异常:

JavaPairRDD<ByteBuffer, SortedMap<ByteBuffer, IColumn>> reducedRdd = rdd.reduceByKey(
    new Function2<SortedMap<ByteBuffer, IColumn>, SortedMap<ByteBuffer, IColumn>, sortedMap<ByteBuffer, IColumn>>() {
        public SortedMap<ByteBuffer, IColumn> call(SortedMap<ByteBuffer, IColumn> arg0,
            SortedMap<ByteBuffer, IColumn> arg1) throws Exception {
            SortedMap<ByteBuffer, IColumn> sortedMap = new TreeMap<ByteBuffer, IColumn>(arg0.comparator());
            sortedMap.putAll(arg0);
            sortedMap.putAll(arg1);
            return sortedMap;
        }
    }
);

我正在使用:

  • spark-1.0.0-bin-hadoop1
  • 卡桑德拉1.2.12
  • Java 1.6

有谁知道是什么问题吗?是什么导致序列化失败?

谢谢,
Shai

您的问题可能是由于试图序列化字节缓冲区引起的。它们是不可序列化的,您需要在生成RDD之前将它们转换为字节数组。

你应该尝试一下Spark的官方DataStax Cassandra驱动程序

最新更新