我有一个大数据帧(1.2GB左右),结构如下:
<>之前+---------+--------------+------------------------------------------------------------------------------------------------------+| country | date_data | text |+---------+--------------+------------------------------------------------------------------------------------------------------+"美国"| | |"2016-10-03 T_D: QQWE nT_NAME: name_1 nT_IN: ind_1 nT_C: c1ws12 nT_ADD: Sec_1_P n ...........nT_R: 45 " |"美国"| | |"2016-10-03 T_D: QQAA nT_NAME: name_2 nT_IN: ind_2 nT_C: c1ws12 nT_ADD: Sec_1_P n ...........nT_R: 46ee" ||。|。|。||。|。|。|"美国"| | |"2016-10-03 T_D: QQWE nT_NAME: name_300000 nT_IN: ind_65 nT_C: c1ws12 nT_ADD: Sec_1_P n ...........nT_R: 47aa" |+---------+--------------+------------------------------------------------------------------------------------------------------+之前行数为300.000,"text"字段是一个大约5000个字符的字符串。
我想在这个新字段中分隔字段" text ":
<>之前+---------+------------+------+-------------+--------+--------+---------+--------+------+| country | date_data | t_d | t_name | t_in | t_c | t_add | ......| t_r |+---------+------------+------+-------------+--------+--------+---------+--------+------+| EEUU | 2016-10-03 | QQWE | name_1 | ind_1 | c1ws12 | Sec_1_P | ......| 45米|| EEUU | 2016-10-03 | QQAA | name_2 | ind_2 | c1ws12 | Sec_1_P | ......| 45米||。|。|。|。|。|。|。|。| ||。|。|。|。|。|。|。|。| ||。|。|。|。|。|。|。|。| || EEUU | 2016-10-03 | QQWE | name_300000 | ind_65 | c1ws12 | Sec_1_P | ......| 47aa |+---------+------------+------+-------------+--------+--------+---------+--------+------+之前目前,我正在使用正则表达式来解决这个问题。首先,我编写正则表达式并创建一个函数来从文本中提取单个字段(总共90个正则表达式):
val D_text = "((?<=T_D: ).*?(?=\\n))".r
val NAME_text = "((?<=nT_NAME: ).*?(?=\\n))".r
val IN_text = "((?<=T_IN: ).*?(?=\\n))".r
val C_text = "((?<=T_C: ).*?(?=\\n))".r
val ADD_text = "((?<=T_ADD: ).*?(?=\\n))".r
.
.
.
.
val R_text = "((?<=T_R: ).*?(?=\\n))".r
//UDF function:
def getFirst(pattern2: scala.util.matching.Regex) = udf(
(url: String) => pattern2.findFirstIn(url) match {
case Some(texst_new) => texst_new
case None => "NULL"
case null => "NULL"
}
)
然后,我创建了一个新的Dataframe (tbl_separate_fields),这是使用正则表达式从文本中提取每个新字段的函数的结果。
val tbl_separate_fields = hiveDF.select(
hiveDF("country"),
hiveDF("date_data"),
getFirst(D_text)(hiveDF("texst")).alias("t_d"),
getFirst(NAME_text)(hiveDF("texst")).alias("t_name"),
getFirst(IN_text)(hiveDF("texst")).alias("t_in"),
getFirst(C_text)(hiveDF("texst")).alias("t_c"),
getFirst(ADD_text)(hiveDF("texst")).alias("t_add"),
.
.
.
.
getFirst(R_text)(hiveDF("texst")).alias("t_r")
)
最后,我将这个数据框插入到Hive表中:
tbl_separate_fields.registerTempTable("tbl_separate_fields")
hiveContext.sql("INSERT INTO TABLE TABLE_INSERT PARTITION (date_data) SELECT * FROM tbl_separate_fields")
这个解决方案在整个数据框架中持续1小时,所以我希望优化并减少执行时间。有解决办法吗?
我们使用Hadoop 2.7.1和Apache-Spark 1.5.1。Spark的配置如下:
val conf = new SparkConf().set("spark.storage.memoryFraction", "0.1")
val sc = new SparkContext(conf)
val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
提前感谢。
编辑数据:
<>之前+---------+--------------+------------------------------------------------------------------------------------------------------+| country | date_data | text |+---------+--------------+------------------------------------------------------------------------------------------------------+"美国"| | |"2016-10-03 T_D: QQWE nT_NAME: name_1 nT_IN: ind_1 nT_C: c1ws12 nT_ADD: Sec_1_P n ...........nT_R: 45 " |"美国"| | |"2016-10-03 T_NAME: name_2 nT_D: QQAA nT_IN: ind_2 nT_C: c1ws12 ...........nT_R: 46ee" ||。|。|。||。|。|。|"美国"| | |"2016-10-03 T_NAME: name_300000 nT_ADD: Sec_1_P nT_IN: ind_65 nT_C: c1ws12 n ...........nT_R: 47aa" |+---------+--------------+------------------------------------------------------------------------------------------------------+在这种情况下使用正则表达式既慢又脆弱。
如果您知道所有记录具有相同的结构,即所有"text"值具有相同的数字和顺序的"parts",下面的代码将工作(对于任何数量的列),主要利用org.apache.spark.sql.functions
中的split
函数:
import org.apache.spark.sql.functions._
// first - split "text" column values into Arrays
val textAsArray: DataFrame = inputDF
.withColumn("as_array", split(col("text"), "n"))
.drop("text")
.cache()
// get a sample (first row) to get column names, can be skipped if you want to hard-code them:
val sampleText = textAsArray.first().getAs[mutable.WrappedArray[String]]("as_array").toArray
val columnNames: Array[(String, Int)] = sampleText.map(_.split(": ")(0)).zipWithIndex
// add Column per columnName with the right value and drop the no-longer-needed as_array column
val withValueColumns: DataFrame = columnNames.foldLeft(textAsArray) {
case (df, (colName, index)) => df.withColumn(colName, split(col("as_array").getItem(index), ": ").getItem(1))
}.drop("as_array")
withValueColumns.show()
// for the sample data I created,
// with just 4 "parts" in "text" column, this prints:
// +-------+----------+----+------+-----+------+
// |country| date_data| T_D|T_NAME| T_IN| T_C|
// +-------+----------+----+------+-----+------+
// | EEUU|2016-10-03|QQWE|name_1|ind_1|c1ws12|
// | EEUU|2016-10-03|QQAA|name_2|ind_2|c1ws12|
// +-------+----------+----+------+-----+------+
或者,如果上面的假设不成立,可以使用UDF将文本列转换为Map
,然后对所需列的硬编码列表执行类似的reduceLeft
操作:
import sqlContext.implicits._
// sample data: not the same order, not all records have all columns:
val inputDF: DataFrame = sc.parallelize(Seq(
("EEUU", "2016-10-03", "T_D: QQWEnT_NAME: name_1nT_IN: ind_1nT_C: c1ws12"),
("EEUU", "2016-10-03", "T_D: QQAAnT_IN: ind_2nT_NAME: name_2")
)).toDF("country", "date_data", "text")
// hard-coded list of expected column names:
val columnNames: Seq[String] = Seq("T_D", "T_NAME", "T_IN", "T_C")
// UDF to convert text into key-value map
val asMap = udf[Map[String, String], String] { s =>
s.split("n").map(_.split(": ")).map { case Array(k, v) => k -> v }.toMap
}
val textAsMap = inputDF.withColumn("textAsMap", asMap(col("text"))).drop("text")
// for each column name - lookup the value in the map
val withValueColumns: DataFrame = columnNames.foldLeft(textAsMap) {
case (df, colName) => df.withColumn(colName, col("textAsMap").getItem(colName))
}.drop("textAsMap")
withValueColumns.show()
// prints:
// +-------+----------+----+------+-----+------+
// |country| date_data| T_D|T_NAME| T_IN| T_C|
// +-------+----------+----+------+-----+------+
// | EEUU|2016-10-03|QQWE|name_1|ind_1|c1ws12|
// | EEUU|2016-10-03|QQAA|name_2|ind_2| null|
// +-------+----------+----+------+-----+------+