如何在RDD Spark中添加唯一值



嗨,我正在使用scala来识别行的第一字,并创建唯一值并在RDD中附加它。但是我不知道该怎么做。我是Scala的新手,所以如果这个问题听起来很la脚,请原谅。我正在尝试的样本在下面给出。

样本:

OBR|1|METABOLIC PANEL
OBX|1|Glucose
OBX|2|BUN
OBX|3|CREATININE
OBR|2|RFLX TO VERIFICATION
OBX|1|EGFR
OBX|2|SODIUM
OBR|3|AMBIGUOUS DEFAULT
OBX|1|POTASSIUM

我想检查第一个单词是否是 obr 不是OBR是否比我创建一个唯一值,并希望在OBR中附加 obx 直到我发现我想做的另外一个。但是我该怎么做呢?我从 HDFS 中带来了我的数据。

预期结果:

OBR|1|METABOLIC PANEL|OBR_filename_1
OBX|1|Glucose|OBR_filename_1
OBX|2|BUN|OBR_filename_1
OBX|3|CREATININE|OBR_filename_1
OBR|2|RFLX TO VERIFICATION|OBR_filename_2
OBX|1|EGFR|OBR_filename_2
OBX|2|SODIUM|OBR_filename_2
OBR|3|AMBIGUOUS DEFAULT|OBR_filename_3
OBX|1|POTASSIUM|OBR_filename_3

正如我的评论中提到的那样,这只能在一个核心上使用,除非有人能阐明我所缺少的东西,否则不应该使用SPARK完成。我假设该文件只是您示例中所述的HDFS上的文本文件。

val text: RDD[(String, Long)] = sc.textFile(<path>).zipWithIndex
val tupled: RDD[((String, Int, String), Int)] = text.map{case (r, i) => (r.split('|'), i)).map{case (s, i) => ((s(0), s(1).toInt, s(2)), i)}
val obrToFirstIndex: Array[(Int, Long)] = tupled.filter(_._1._1 == "OBR").map{case (t, i) => (t._2, i)}.reduceByKey(Math.min).collect()
val bcIndexes = sc.broadcast(obrToFirstIndex.sortBy(_._2))
val withObr = tupled.mapValues(i => bcIndexes.value.find(_._2 >= i).getOrElse(bcIndexes.value.last)._1)
val result: RDD[String] = withObr.map{case ((t1, t2, t2), obrind) => Array(t1, t2, t3, s"OBR_filaneme_$obrind").mkString("|")

在我当前的构图上,我无法测试上述内容,因此它可能会被一个错误或次要错别字所限制,但是这个想法就在那里。但是让我重申,这不是Spark的工作。

编辑:仅出现在我身上,因为只有一个部分您可以使用映射,然后编写代码在该分区中的Java/Scala中的方式。

您遇到的问题是发现不正确,需要不同的条件才能工作。这是我以前暗示的更简单的方法

val text: RDD[String] = sc.textFile(<path>)
val result: RDD[String] = text.mapPartitions{part =>
    var obrInd = 0
    part.map{r =>
        val code= r.split('|')(0)
        if(code == "OBR") obrInd += 1
        r + "|OBR_filename_" + obrInd
    }
}

最新更新