给定如下数据:
val my_data = sc.parallelize(Array(
"Key1, foobar, 10, twenty, 20",
"Key2, impt, 11, sixty, 6",
"Key3, helloworld, 110, seventy, 9"))
我想过滤并创建一个key,value
RDD,如下所示:
key1, foobar
key1, twenty
key2, impt
key2, sixty
key3, helloworld
key3, seventy
我尝试过什么
我想我可以将数据放在一个表中,然后推断数据类型。
//is there a way to avoid writing to file???
my_data.coalesce(1).saveAsTextFile("/tmp/mydata.csv")
val df_mydata = sqlContext.read
.format("com.databricks.spark.csv")
.option("inferSchema", "true")
.load("/tmp/mydata.csv")
以上工作使我有一个具有正确数据类型的表。但是,我不知道如何过滤数据类型,然后从中创建键/值对。
我也可以使用Character.isDigit
而不是创建模式,但仍然需要知道如何过滤键/值对
正如您提到的,解决它的一种方法是将Character.isDigit
与split
一起使用和flatMap
。以您的my_data
为例:
val spark = SparkSession.builder.getOrCreate()
import spark.implicits._
val df = my_data.map(_.split(",").map(_.trim).toList.filterNot(s => s.forall(_.isDigit)))
.flatMap{case ((key: String)::tail) => tail.map(t => (key, t))}.toDF("Key", "Value")
df.show()
这会给你这样的东西:
+----+----------+
| Key| Value|
+----+----------+
|Key1| foobar|
|Key1| twenty|
|Key2| impt|
|Key2| sixty|
|Key3|helloworld|
|Key3| seventy|
+----+----------+
在这里,我也将其转换为数据帧,但是如果您想要rdd,则可以简单地跳过该步骤。 要使其正常工作,每行都必须包含一个键,并且该键应位于字符串中的第一个位置。
希望对您有所帮助!
编辑:
所用命令的细分。
第一个映射遍历 rdd 中的每个字符串,对于每个字符串,应用以下内容(按顺序):
.split(",")
.map(_.trim)
.toList
.filterNot(s => s.forall(_.isDigit))
让我们以您的第一行为例:"Key1, foobar, 10, twenty, 20"
。首先,该行由","分隔,这将为您提供一个字符串数组Array("Key1", " foobar", " 10", " twenty", " 20")
。
接下来是map(_.trim)
它将修剪(删除单词前后的空格)数组中的每个元素,数组也被转换为列表(用于稍后 flatMap 中的大小写匹配):List("Key1", "foobar", "10", "twenty", "20")
.
filterNot
将删除所有字符均为数字的所有字符串。此处的forall
检查,以便每个字符都满足此条件。这将删除列表中的一些元素:List("Key1", "foobar", "twenty")
。
现在,完成过滤后,仅保留键后的分组:
flatMap{case ((key: String)::tail) => tail.map(t => (key, t))}
在这里,key
成为每行列表的第一个元素,跟随它成为"Key1"之前的示例行。tail
只是列表的其余部分。然后,对于不是key
值的每个元素,我们将其替换为元组(key, value)
。换句话说,每个元素(除了第一个元素,即key
)都成为包含key
和自身的元组。 此处使用flatMap
,否则您将获得元组列表,而不仅仅是所需的元组。
最后一种是使用toDF("Key", "Value")
将其转换为具有命名列的数据帧,请注意,这需要在开头(import spark.implicits._
)中使用的导入。