出于单元测试的目的,我正在构建自己的HBaseResult对象,如下所示
val row = Bytes.toBytes( "row01" )
val cf = Bytes.toBytes( "cf" )
val cell1 = new KeyValue( row, cf, "v1".getBytes(), Bytes.toBytes( "file1" ) )
val cell2 = new KeyValue( row2, cf, "v2".getBytes(), Bytes.toBytes( "file2" ) )
val cells = List( cell1, cell2 )
val result = Result.create( cells )
现在我想把它添加到一个sparkContext对象中,比如
val sparkContext = new org.apache.spark.SparkContext( conf )
val rdd = sparkContext.parallelize( List( result ) )
然而,一旦我尝试通过foreach访问rdd,比如
rdd.foreach{x=>x}
我得到了著名的Spark Task Not serializable。
有人知道一种更好的方法来计算RDD[结果]吗?
Result
是不可序列化的,因此如果您想要一个RDD[Result]
,您必须从其他输入在节点本身上生成Result
s(当然,像collect
、first
这样在节点之间发送Result
s的操作也不起作用(。因此,例如
val rdd0 = sparkContext.parallelize( List( ("row", "cf") ) )
val rdd = rdd.map { case (str1, str2) =>
val row = Bytes.toBytes( str1 )
val cf = Bytes.toBytes( str2 )
val cell1 = new KeyValue( row, cf, "v1".getBytes(), Bytes.toBytes( "file1" ) )
val cell2 = new KeyValue( row2, cf, "v2".getBytes(), Bytes.toBytes( "file2" ) )
val cells = List( cell1, cell2 )
Result.create( cells )
}