创建spark函数,接受key,value作为参数并返回RDD[string]



我想创建一个函数,稍后可以由三个不同的RDD数据集使用。函数接受键和值并转换为seq[String]

def ConvertToMap2(value: RDD[(String, (String,String,String,String,String,String))]): Seq[String]  = {
  value.collect().toMap.values.toSeq.map(x => x.toString.replace("(","").replace(")",""))
}

当我试图通过一个数据集应用它的ok,因为它有一个键与6个值的例子:-

val StatusRDD=ConvertToMap(FilterDataSet("1013").map(x => ((x(5)+x(4)),(x(5),x(4),x(1),x(6),x(7),x(8)))))

但我试图应用于另一个数据集,我需要我们写函数,因为其他数据集包含一个键的7个值,这使得用相同的逻辑重写函数,但不同的名称。

def ConvertToMap2(value: RDD[(String,(String,String,String,String,String,String,String))]): Seq[String]  = {
  value.collect().toMap.values.toSeq.map(x => x.toString.replace("(","").replace(")",""))
}
val LuldRDD2=ConvertToMap2(FilterDataSet("1041").map(x => ((x(5)+x(4)),(x(5),x(4),x(1),x(6),x(7),x(8),x(9)))))

是否有一种方法来编写一个函数,这两个接受6或7值的字符串只有一个键?

TupleX类继承自Product,所以我将这样定义函数:

def convertToSeq(rdd: RDD[(String, Product)]): Seq[String] = {
  rdd.values.map(x => x.productIterator.mkString).collect().toSeq
}

请注意,TupleX类有一个productIterator,我在这里使用它来创建字符串(我发现你的方式有点冗长,更难以阅读),我也延迟了collect调用,直到转换值之后,所以映射操作并行运行。

最后,我更改了函数的名称,因为它转换为Seq而不是Map

答案是需要使用任意数据类型

def ConvertToMap (value: RDD[(String,Any)]): Seq[String]  = {
   value.collect().toMap.values.toSeq.map(x => x.toString.replace("(","").replace(")",""))
}

最新更新