这是我的数据集
reloadmonthly
DataFrame[year: string, month: string, msisdn: string, reload_min: double, reload_max: double, reload_avg: double, reload_sum: double, rembal_min: string, rembal_max: string, rembal_avg: double, rembal_sum: double, period: string, application_type: string, periodloan: string, ix: string, last_x_month: double]
reloadmonthly.show(2)
+----+-----+-------------+----------+----------+----------+----------+----------+----------+----------+----------+------+----------------+----------+---+------------+
|year|month| msisdn|reload_min|reload_max|reload_avg|reload_sum|rembal_min|rembal_max|rembal_avg|rembal_sum|period|application_type|periodloan| ix|last_x_month|
+----+-----+-------------+----------+----------+----------+----------+----------+----------+----------+----------+------+----------------+----------+---+------------+
|2019| 10| 628176789488| 5000.0| 5000.0| 5000.0| 5000.0| 5189.0| 5189.0| 5189.0| 5189.0|201910| 10| 202001| 1| 1.0|
|2019| 10|6281802031321| 25000.0| 25000.0| 25000.0| 25000.0| 25633.0| 25633.0| 25633.0| 25633.0|201910| 100| 202001| 1| 2.0|
+----+-----+-------------+----------+----------+----------+----------+----------+----------+----------+----------+------+----------------+----------+---+------------+
only showing top 2 rows
这是我的代码
reloadid = reloadmonthly.dropDuplicates(["msisdn"])
reloadid = reloadid.join(
packetmonthly.withColumn("p", F.expr("concat('reload_sum_l', last_x_month)"))
.groupBy("msisdn")
.pivot("p")
.sum("reload_sum"),
on=["msisdn"],
how="left_outer",
)
这是错误信息
AnalysisException: 'Cannot resolve column name "reload_sum" among (year, month, msisdn, packet_min, packet_max, packet_avg, packet_sum, period, application_type, periodloan, ix, last_x_month, p);'
您在join
之前(内部)执行pivot
。因此,您正在尝试透视packetmonthly
,它显然不包含任何列reload_sum
(此列出现在reloadmonthly
中)。我编辑了您的代码,以放大您在联接中执行枢轴的部分。
也许,你只需要在枢轴之前进行连接——我真的无法测试,因为你没有给出packetmonthly
的定义,但代码应该是这样的:
reloadid = (
reloadid.join(
packetmonthly,
on=["msisdn"],
how="left_outer",
)
.withColumn("p", F.expr("concat('reload_sum_l', last_x_month)"))
.groupBy("msisdn")
.pivot("p")
.sum("reload_sum")
)
package somePackage.notepad
import somePackage.SparkSessionTestWrapper
import org.apache.spark.sql.DataFrame
import org.scalatest.funsuite.AnyFunSuite
import org.scalatest.matchers.should.Matchers
import org.apache.spark.sql.functions.{col, expr}
class NotepadSpec extends AnyFunSuite with SparkSessionTestWrapper with Matchers {
spark.sparkContext.setLogLevel("ERROR")
import spark.implicits._
test ("Notepad test") {
val inputData: Seq[NotepadInput] = Seq(
NotepadInput(
year = "2019",
month = "10",
msisdn = "628176789488",
reload_min = 5000.0,
reload_max = 5000.0,
reload_avg = 5000.0,
reload_sum = 5000.0,
rembal_min = "5189.0",
rembal_max = "5189.0",
rembal_avg = 5189.0,
rembal_sum = 5189.0,
period = "201910",
application_type = "10",
periodloan = "202001",
ix = "1",
last_x_month = 1.0,
),
NotepadInput(
year = "2019",
month = "10",
msisdn = "6281802031321",
reload_min = 25000.0,
reload_max = 25000.0,
reload_avg = 25000.0,
reload_sum = 25000.0,
rembal_min = "25633.0",
rembal_max = "25633.0",
rembal_avg = 25633.0,
rembal_sum = 25633.0,
period = "201910",
application_type = "10",
periodloan = "202001",
ix = "1",
last_x_month = 2.0,
)
)
val df: DataFrame = inputData.toDF.dropDuplicates("msisdn")
val pivotedDf: DataFrame =
df
.withColumn("p", expr("concat('reload_sum', last_x_month)"))
.groupBy("msisdn")
.pivot("p")
.sum("reload_sum")
val outDf: DataFrame = df
.join(pivotedDf, Seq("msisdn"), "left_outer")
println(outDf.show(false))
}
}
+-------------+----+-----+----------+----------+----------+----------+----------+----------+----------+----------+------+----------------+----------+---+------------+-------------+-------------+
|msisdn |year|month|reload_min|reload_max|reload_avg|reload_sum|rembal_min|rembal_max|rembal_avg|rembal_sum|period|application_type|periodloan|ix |last_x_month|reload_sum1.0|reload_sum2.0|
+-------------+----+-----+----------+----------+----------+----------+----------+----------+----------+----------+------+----------------+----------+---+------------+-------------+-------------+
|6281802031321|2019|10 |25000.0 |25000.0 |25000.0 |25000.0 |25633.0 |25633.0 |25633.0 |25633.0 |201910|10 |202001 |1 |2.0 |null |25000.0 |
|628176789488 |2019|10 |5000.0 |5000.0 |5000.0 |5000.0 |5189.0 |5189.0 |5189.0 |5189.0 |201910|10 |202001 |1 |1.0 |5000.0 |null |
+-------------+----+-----+----------+----------+----------+----------+----------+----------+----------+----------+------+----------------+----------+---+------------+-------------+-------------+