我是R背景的Spark和Scala的新手。在对RDD进行了几次转换后,我得到了类型的RDD
Description: RDD[(String, Int)]
现在我想在StringRDD上应用一个正则表达式,从String中提取子字符串,并在新的列中只添加子字符串。
输入数据:
BMW 1er Model,278
MINI Cooper Model,248
我正在寻找的输出:
Input | Brand | Series
BMW 1er Model,278, BMW , 1er
MINI Cooper Model ,248 MINI , Cooper
其中Brand和Series是字符串RDD 中新计算的子字符串
到目前为止我所做的一切。
我可以使用正则表达式为字符串实现这一点,但我不能应用于所有行。
val brandRegEx = """^.*[Bb][Mm][Ww]+|.[Mm][Ii][Nn][Ii]+.*$""".r //to look for BMW or MINI
然后我可以使用
brandRegEx.findFirstIn("hello this mini is bmW testing")
但是,我如何将它用于RDD的所有行,并应用不同的正则表达式来实现上述输出。
我读过这个代码片段,但不知道如何把它放在一起。
val brandRegEx = """^.*[Bb][Mm][Ww]+|.[Mm][Ii][Nn][Ii]+.*$""".r
def getBrand(Col4: String) : String = Col4 match {
case brandRegEx(str) =>
case _ => ""
return 'substring
}
任何帮助都将不胜感激!
感谢
要将正则表达式应用于RDD中的每一项,您应该使用RDD map
函数,该函数使用某些函数(在本例中,是一个偏函数,以便提取到组成每一行的元组的两部分)来转换RDD中每一行:
import org.apache.spark.{SparkContext, SparkConf}
object Example extends App {
val sc = new SparkContext(new SparkConf().setMaster("local").setAppName("Example"))
val data = Seq(
("BMW 1er Model",278),
("MINI Cooper Model",248))
val dataRDD = sc.parallelize(data)
val processedRDD = dataRDD.map{
case (inString, inInt) =>
val brandRegEx = """^.*[Bb][Mm][Ww]+|.[Mm][Ii][Nn][Ii]+.*$""".r
val brand = brandRegEx.findFirstIn(inString)
//val seriesRegEx = ...
//val series = seriesRegEx.findFirstIn(inString)
val series = "foo"
(inString, inInt, brand, series)
}
processedRDD.collect().foreach(println)
sc.stop()
}
请注意,我认为正则表达式中存在一些问题,并且还需要一个正则表达式来查找序列。该代码输出:
(BMW 1er Model,278,BMW,foo)
(MINI Cooper Model,248,NOT FOUND,foo)
但是,如果您根据需要更正正则表达式,这就是将它们应用于每一行的方法。
嗨,我正在寻找另一个问题,得到了这个问题。上述问题可以使用法线变换来解决。
val a=sc.parallelize(collection)
a.map{case (x,y)=>(x.split (" ")(0)+" "+x.split(" ")(1))}.collect