有2个非常大的RDD(每个都有超过100万条记录),第一个是:
rdd1.txt(name,value):
chr1 10016
chr1 10017
chr1 10018
chr1 20026
chr1 20036
chr1 25016
chr1 26026
chr2 40016
chr2 40116
chr2 50016
chr3 70016
rdd2.txt(name,min,max):
chr1 10000 20000
chr1 20000 30000
chr2 40000 50000
chr2 50000 60000
chr3 70000 80000
chr3 810001 910000
chr3 860001 960000
chr3 910001 1010000
该值只有在第二个RDD的最小值和最大值之间的范围内才有效,如果该名称有效,则该名称出现的计数将加1
以上面的例子为例,chr1's出现7.
我怎么能得到的结果在scala与spark?
多谢
尝试:
val rdd1 = sc.parallelize(Seq(
("chr1", 10016 ), ("chr1", 10017), ("chr1", 10018)))
val rdd2 = sc.parallelize(Seq(
("chr1", 10000, 20000), ("chr1",20000, 30000)))
rdd1.toDF("name", "value").join(rdd2.toDF("name", "min", "max"), Seq("name"))
.where($"value".between($"min", $"max"))
据我所知,您希望rdd1的值落在rdd2的min和max之间。请查看下面是否有效
val rdd1 = sc.parallelize(Seq(("chr1", 10016 ), ("chr1", 10017), ("chr1", 10018)))
val rdd2 = sc.parallelize(Seq(("chr1", 10000, 20000), ("chr1",20000, 30000)))
rdd1.toDF("name", "value").join(rdd2.toDF("name", "min", "max"), Seq("name")).where($"value".between($"min", $"max")).groupBy($"name").count().show()
scala> val rdd1=sc.parallelize(Seq(("chr1", 10016 ),("chr1", 10017 ),("chr1", 10018 ),("chr1", 20026 ),("chr1", 20036 ),("chr1", 25016 ),("chr1", 26026),("chr2", 40016 ),("chr2", 40116 ),("chr2", 50016 ),("chr3", 70016 )))
rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[33] at parallelize at <console>:24
scala> val rdd2=sc.parallelize(Seq(("chr1", 10000, 20000),("chr1", 20000 , 30000),("chr2", 40000 ,50000),("chr2", 50000 ,60000),("chr3", 70000 ,80000),("chr3", 810001 ,910000),("chr3", 860001 ,960000),("chr3", 910001 ,1010000)))
rdd2: org.apache.spark.rdd.RDD[(String, Int, Int)] = ParallelCollectionRDD[34] at parallelize at <console>:24
scala> rdd1.toDF("name", "value").join(rdd2.toDF("name", "min", "max"), Seq("name")).where($"value".between($"min", $"max")).groupBy($"name").count().show()
+----+-----+
|name|count|
+----+-----+
|chr3| 1|
|chr1| 7|
|chr2| 3|
+----+-----+
编辑如果是从一个文件中读取,我会使用下面的
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.types.{StructType, StructField, StringType, IntegerType};
val sqlContext = new SQLContext(sc)
val nameValueSchema = StructType(Array(StructField("name", StringType, true),StructField("value", IntegerType, true)))
val nameMinMaxSchema = StructType(Array(StructField("name", StringType, true),StructField("min", IntegerType, true),StructField("max", IntegerType, true)))
val rdd1 = sqlContext.read.format("com.databricks.spark.csv").option("header", "false").schema(nameValueSchema).load("rdd1.csv")
val rdd2 = sqlContext.read.format("com.databricks.spark.csv").option("header", "false").schema(nameMinMaxSchema).load("rdd2.csv")
rdd1.toDF("name", "value").join(rdd2.toDF("name", "min", "max"), Seq("name")).where($"value".between($"min", $"max")).groupBy($"name").count().show()
这将在所有节点上运行,不需要并行调用。此处引用文档
defparallelize [T](seq: seq [T], numSlices: Int =defaultParallelism)(隐式arg0: ClassTag[T]): RDD[T]分发一个本地Scala集合,形成一个RDD。