我使用apache spark 2.2.0和scala。
我正在遵循这个问题作为枢轴框架的指南,而无需使用枢轴函数。
我需要在不使用Pivot函数的情况下旋转数据框,因为我具有非数字数据,并且pivot
仅在数值数据上使用sum
,min
,max
等聚合函数。我想在pivot
聚合中使用一个非数字列。
这是我的数据:
+---+-------------+----------+-------------+----------+-------+
|Qid| Question|AnswerText|ParticipantID|Assessment| GeoTag|
+---+-------------+----------+-------------+----------+-------+
| 1|Question1Text| Yes| abcde1| 0|(x1,y1)|
| 2|Question2Text| No| abcde1| 0|(x1,y1)|
| 3|Question3Text| 3| abcde1| 0|(x1,y1)|
| 1|Question1Text| No| abcde2| 0|(x2,y2)|
| 2|Question2Text| Yes| abcde2| 0|(x2,y2)|
+---+-------------+----------+-------------+----------+-------+
我希望它通过ParticipantID
,Assessment
和GeoTag
标签进行分组,并在Question
列上进行"枢轴",并从AnswerText
列中获取值。最后,输出应如下:
+-------------+-----------+----------+-------+-----+----- +
|ParticipantID|Assessment |GeoTag |Qid_1 |Qid_2|Qid_3 |
+-------------+-----------+----------+-------+-----+------+
|abcde1 |0 |(x1,y1) |Yes |No |3 |
|abcde2 |0 |(x2,y2) |No |Yes |null |
+-------------+-----------+----------+-------+-----+------+
我尝试了以下方法:
val questions: Array[String] = df.select("Q_id")
.distinct()
.collect()
.map(_.getAs[String]("Q_id"))
.sortWith(_<_)
val df2: DataFrame = questions.foldLeft(df) {
case (data, question) => data.selectExpr("*", s"IF(Q_id = '$question', AnswerText, 0) AS $question")
}
[接着是groupby表达式]
但是我遇到以下错误,这与最终语句AS $question
17/12/08 16:13:12 INFO SparkSqlParser: Parsing command: *
17/12/08 16:13:12 INFO SparkSqlParser: Parsing command: IF(Q_id_string_new_2 = '101_Who_is_with_you_right_now?', AnswerText, 0) AS 101_Who_is_with_you_right_now?
extraneous input '?' expecting <EOF>(line 1, pos 104)
== SQL ==
IF(Q_id_string_new_2 = '101_Who_is_with_you_right_now?', AnswerText, 0) AS 101_Who_is_with_you_right_now?
--------------------------------------------------------------------------------------------------------^^^
org.apache.spark.sql.catalyst.parser.ParseException:
extraneous input '?' expecting <EOF>(line 1, pos 104)
== SQL ==
IF(Q_id_string_new_2 = '101_Who_is_with_you_right_now?', AnswerText, 0) AS 101_Who_is_with_you_right_now?
--------------------------------------------------------------------------------------------------------^^^
at org.apache.spark.sql.catalyst.parser.ParseException.withCommand(ParseDriver.scala:217)
我出错了什么想法?有没有更好的办法?如果需要,我想到要恢复到熊猫和python外面的火花,但是如果可能的话,我宁愿在同一框架中写下所有代码。
as $question
将问题变量的值替换为sql语句,您最终以'?在SQL中。?
不是列名中的有效字符,因此您必须至少使用backticks来引用:
s"IF(Q_id = '$question', AnswerText, 0) AS `$question`"
或使用select
/withColumn
:
import org.apache.spark.sql.functions.when
case (data, question) =>
data.withColumn(question, when($"Q_id" === question, $"AnswerText"))
首先使用regexp_replace
。
需要在不使用枢轴函数的情况下枢转数据框,因为我具有非数字数据,而df.pivot仅适用于汇总函数,例如sum,min,max on数字数据。
您可以使用first
:如何使用枢轴并在非数字列上计算平均值(面对AnalySisexception&quot"不是数字列;)?
data.groupBy($"ParticipantID", $"Assessment", $"GeoTag")
.pivot($"Question", questions).agg(first($"AnswerText"))
只是 @user8371915接受的答案的注释,可以使查询更快。
有一种方法可以避免使用标头产生questions
的昂贵扫描。
简单地生成标头(在同一作业和阶段!),然后在列上进行pivot
。
// It's a simple and cheap map-like transformation
val qid_header = input.withColumn("header", concat(lit("Qid_"), $"Qid"))
scala> qid_header.show
+---+-------------+----------+-------------+----------+-------+------+
|Qid| Question|AnswerText|ParticipantID|Assessment| GeoTag|header|
+---+-------------+----------+-------------+----------+-------+------+
| 1|Question1Text| Yes| abcde1| 0|(x1,y1)| Qid_1|
| 2|Question2Text| No| abcde1| 0|(x1,y1)| Qid_2|
| 3|Question3Text| 3| abcde1| 0|(x1,y1)| Qid_3|
| 1|Question1Text| No| abcde2| 0|(x2,y2)| Qid_1|
| 2|Question2Text| Yes| abcde2| 0|(x2,y2)| Qid_2|
+---+-------------+----------+-------------+----------+-------+------+
将标题作为数据集的一部分,让我们旋转。
val solution = qid_header
.groupBy('ParticipantID, 'Assessment, 'GeoTag)
.pivot('header)
.agg(first('AnswerText))
scala> solution.show
+-------------+----------+-------+-----+-----+-----+
|ParticipantID|Assessment| GeoTag|Qid_1|Qid_2|Qid_3|
+-------------+----------+-------+-----+-----+-----+
| abcde1| 0|(x1,y1)| Yes| No| 3|
| abcde2| 0|(x2,y2)| No| Yes| null|
+-------------+----------+-------+-----+-----+-----+