我想在我自己的平等比较器上使用groupby操作员。
让我们假设我想执行类似:
df.groupBy("Year","Month").sum("Counter")
在此数据框中:
Year | Month | Counter
---------------------------
2012 | Jan | 100
12 | January | 200
12 | Janu | 300
2012 | Feb | 400
13 | Febr | 500
我必须实现两个比较器:
1)专栏年:P.E。" 2012" ==" 12"
2)在专栏月份:P.E。" Jan" ==" 1月" ==" Janu"
假设我已经实施了这两个比较器。我该如何调用它们?如在此示例中,我已经知道我必须将数据框架转换为RDD,以使使用我的比较器。
我考虑使用RDD Groupby。
请注意,我确实需要使用比较器进行此操作。我不能使用UDF,更改数据或创建新列。未来的想法是具有密文列,在该列中,我具有允许我比较两个密文是否相同的功能。我想在比较器中使用它们。
编辑:
在这一刻,我试图只用一列进行此操作,例如:
df.groupBy("Year").sum("Counter")
我有一个包装类:
class ExampleWrapperYear (val year: Any) extends Serializable {
// override hashCode and Equals methods
}
然后,我正在这样做:
val rdd = df.rdd.keyBy(a => new ExampleWrapperYear(a(0))).groupByKey()
我在这里的问题是如何执行"总和",以及如何将钥匙比与多列使用examplewrapperyear和examplewrappermonth。
此解决方案应该起作用。
您可以根据不同的ciphertexts
进行修改/更新hashcode并等于 case class Year(var year:Int){
override def hashCode(): Int = {
this.year = this.year match {
case 2012 => 2012
case 12 => 2012
case 13 => 2013
case _ => this.year
}
this.year.hashCode()
}
override def equals(that: Any): Boolean ={
val year1 = 2000 + that.asInstanceOf[Year].year % 100
val year2 = 2000 + this.year % 100
if (year1 == year2)
true
else
false
}
}
case class Month(var month:String){
override def hashCode(): Int = {
this.month = this.month match {
case "January" => "Jan"
case "Janu" => "Jan"
case "February" => "Feb"
case "Febr" => "Feb"
case _ => this.month
}
this.month.hashCode
}
override def equals(that: Any): Boolean ={
val month1 = this.month match {
case "January" => "Jan"
case "Janu" => "Jan"
case "February" => "Feb"
case "Febr" => "Feb"
case _ => this.month
}
val month2 = that.asInstanceOf[Month].month match {
case "January" => "Jan"
case "Janu" => "Jan"
case "February" => "Feb"
case "Febr" => "Feb"
case _ => that.asInstanceOf[Month].month
}
if (month1.equals(month2))
true
else
false
}
}
这是分组键的重要比较器,它只是使用单个COL比较器
case class Key(var year:Year, var month:Month){
override def hashCode(): Int ={
this.year.hashCode() + this.month.hashCode()
}
override def equals(that: Any): Boolean ={
if ( this.year.equals(that.asInstanceOf[Key].year) && this.month.equals(that.asInstanceOf[Key].month))
true
else
false
}
}
case class Record(year:Int,month:String,counter:Int)
val df = spark.read.format("com.databricks.spark.csv")
.option("header", "true")
.option("inferSchema", "true")
.load("data.csv").as[Record]
df.rdd.groupBy[Key](
(record:Record)=>Key(Year(record.year), Month(record.month)))
.map(x=> Record(x._1.year.year, x._1.month.month, x._2.toList.map(_.counter).sum))
.toDS().show()
给出
+----+-----+-------+
|year|month|counter|
+----+-----+-------+
|2012| Feb| 800|
|2013| Feb| 500|
|2012| Jan| 700|
+----+-----+-------+
for this input in data.csv
Year,Month,Counter
2012,February,400
2012,Jan,100
12,January,200
12,Janu,300
2012,Feb,400
13,Febr,500
2012,Jan,100
请注意,对于年份和月份的案例类,还将该值更新为标准值(否则不可预测的值是什么值)。
您可以使用UDF实现逻辑以使其成为标准的年/月格式
def toYear : (Integer) => Integer = (year:Integer)=>{
2000 + year % 100 //assuming all years in 2000-2999 range
}
def toMonth : (String) => String = (month:String)=>{
month match {
case "January"=> "Jan"
case "Janu"=> "Jan"
case "February" => "Feb"
case "Febr" => "Feb"
case _ => month
}
}
val toYearUdf = udf(toYear)
val toMonthUdf = udf(toMonth)
df.groupBy( toYearUdf(col("Year")), toMonthUdf(col("Month"))).sum("Counter").show()