如何在任意列上旋转



我使用apache spark 2.2.0和scala。

我正在遵循这个问题作为枢轴框架的指南,而无需使用枢轴函数。

我需要在不使用Pivot函数的情况下旋转数据框,因为我具有非数字数据,并且pivot仅在数值数据上使用summinmax等聚合函数。我想在pivot聚合中使用一个非数字列。

这是我的数据:

+---+-------------+----------+-------------+----------+-------+
|Qid|     Question|AnswerText|ParticipantID|Assessment| GeoTag|
+---+-------------+----------+-------------+----------+-------+
|  1|Question1Text|       Yes|       abcde1|         0|(x1,y1)|
|  2|Question2Text|        No|       abcde1|         0|(x1,y1)|
|  3|Question3Text|         3|       abcde1|         0|(x1,y1)|
|  1|Question1Text|        No|       abcde2|         0|(x2,y2)|
|  2|Question2Text|       Yes|       abcde2|         0|(x2,y2)|
+---+-------------+----------+-------------+----------+-------+

我希望它通过ParticipantIDAssessmentGeoTag标签进行分组,并在Question列上进行"枢轴",并从AnswerText列中获取值。最后,输出应如下:

+-------------+-----------+----------+-------+-----+----- +
|ParticipantID|Assessment |GeoTag    |Qid_1  |Qid_2|Qid_3 |
+-------------+-----------+----------+-------+-----+------+
|abcde1       |0          |(x1,y1)   |Yes    |No   |3     |
|abcde2       |0          |(x2,y2)   |No     |Yes  |null  |
+-------------+-----------+----------+-------+-----+------+

我尝试了以下方法:

val questions: Array[String] = df.select("Q_id")
      .distinct()
      .collect()
      .map(_.getAs[String]("Q_id"))
      .sortWith(_<_)
val df2: DataFrame = questions.foldLeft(df) {
      case (data, question) => data.selectExpr("*", s"IF(Q_id = '$question', AnswerText, 0) AS $question")
    }

[接着是groupby表达式]

但是我遇到以下错误,这与最终语句AS $question

的语法有关
17/12/08 16:13:12 INFO SparkSqlParser: Parsing command: *
17/12/08 16:13:12 INFO SparkSqlParser: Parsing command: IF(Q_id_string_new_2 = '101_Who_is_with_you_right_now?', AnswerText, 0) AS 101_Who_is_with_you_right_now?

extraneous input '?' expecting <EOF>(line 1, pos 104)
== SQL ==
IF(Q_id_string_new_2 = '101_Who_is_with_you_right_now?', AnswerText, 0) AS 101_Who_is_with_you_right_now?
--------------------------------------------------------------------------------------------------------^^^
org.apache.spark.sql.catalyst.parser.ParseException: 
extraneous input '?' expecting <EOF>(line 1, pos 104)
== SQL ==
IF(Q_id_string_new_2 = '101_Who_is_with_you_right_now?', AnswerText, 0) AS 101_Who_is_with_you_right_now?
--------------------------------------------------------------------------------------------------------^^^
    at org.apache.spark.sql.catalyst.parser.ParseException.withCommand(ParseDriver.scala:217)

我出错了什么想法?有没有更好的办法?如果需要,我想到要恢复到熊猫和python外面的火花,但是如果可能的话,我宁愿在同一框架中写下所有代码。

as $question将问题变量的值替换为sql语句,您最终以'?在SQL中。?不是列名中的有效字符,因此您必须至少使用backticks来引用:

s"IF(Q_id = '$question', AnswerText, 0) AS `$question`"

或使用select/withColumn

import org.apache.spark.sql.functions.when
case (data, question) => 
  data.withColumn(question, when($"Q_id" === question, $"AnswerText"))

首先使用regexp_replace

需要在不使用枢轴函数的情况下枢转数据框,因为我具有非数字数据,而df.pivot仅适用于汇总函数,例如sum,min,max on数字数据。

您可以使用first:如何使用枢轴并在非数字列上计算平均值(面对AnalySisexception&quot"不是数字列;)?

data.groupBy($"ParticipantID", $"Assessment", $"GeoTag")
  .pivot($"Question", questions).agg(first($"AnswerText"))

只是 @user8371915接受的答案的注释,可以使查询更快。


有一种方法可以避免使用标头产生questions的昂贵扫描。

简单地生成标头(在同一作业和阶段!),然后在列上进行pivot

// It's a simple and cheap map-like transformation
val qid_header = input.withColumn("header", concat(lit("Qid_"), $"Qid"))
scala> qid_header.show
+---+-------------+----------+-------------+----------+-------+------+
|Qid|     Question|AnswerText|ParticipantID|Assessment| GeoTag|header|
+---+-------------+----------+-------------+----------+-------+------+
|  1|Question1Text|       Yes|       abcde1|         0|(x1,y1)| Qid_1|
|  2|Question2Text|        No|       abcde1|         0|(x1,y1)| Qid_2|
|  3|Question3Text|         3|       abcde1|         0|(x1,y1)| Qid_3|
|  1|Question1Text|        No|       abcde2|         0|(x2,y2)| Qid_1|
|  2|Question2Text|       Yes|       abcde2|         0|(x2,y2)| Qid_2|
+---+-------------+----------+-------------+----------+-------+------+

将标题作为数据集的一部分,让我们旋转。

val solution = qid_header
  .groupBy('ParticipantID, 'Assessment, 'GeoTag)
  .pivot('header)
  .agg(first('AnswerText))
scala> solution.show
+-------------+----------+-------+-----+-----+-----+
|ParticipantID|Assessment| GeoTag|Qid_1|Qid_2|Qid_3|
+-------------+----------+-------+-----+-----+-----+
|       abcde1|         0|(x1,y1)|  Yes|   No|    3|
|       abcde2|         0|(x2,y2)|   No|  Yes| null|
+-------------+----------+-------+-----+-----+-----+

相关内容

  • 没有找到相关文章

最新更新