如何为单元测试生成RDD[结果]



出于单元测试的目的,我正在构建自己的HBaseResult对象,如下所示

val row = Bytes.toBytes( "row01" )
val cf = Bytes.toBytes( "cf" )
val cell1 = new KeyValue( row, cf, "v1".getBytes(), Bytes.toBytes( "file1" ) )
val cell2 = new KeyValue( row2, cf, "v2".getBytes(), Bytes.toBytes( "file2" ) )
val cells = List( cell1, cell2 )
val result = Result.create( cells )

现在我想把它添加到一个sparkContext对象中,比如

val sparkContext = new org.apache.spark.SparkContext( conf )
val rdd = sparkContext.parallelize( List( result ) )

然而,一旦我尝试通过foreach访问rdd,比如

rdd.foreach{x=>x}

我得到了著名的Spark Task Not serializable。

有人知道一种更好的方法来计算RDD[结果]吗?

Result是不可序列化的,因此如果您想要一个RDD[Result],您必须从其他输入在节点本身上生成Result s(当然,像collectfirst这样在节点之间发送Result s的操作也不起作用(。因此,例如

val rdd0 = sparkContext.parallelize( List( ("row", "cf") ) )
val rdd = rdd.map { case (str1, str2) =>
  val row = Bytes.toBytes( str1 )
  val cf = Bytes.toBytes( str2 )
  val cell1 = new KeyValue( row, cf, "v1".getBytes(), Bytes.toBytes( "file1" ) )
  val cell2 = new KeyValue( row2, cf, "v2".getBytes(), Bytes.toBytes( "file2" ) )
  val cells = List( cell1, cell2 )
  Result.create( cells )
}

最新更新