如何做外部连接:Spark Scala SQLContext



我试图获得Total(所有的计数)和Top Elements(计数过滤器后),以便我可以找到所有json (Top/Total)中每个placeName的百分位数,评级> 3:

  // sc : An existing SparkContext.
    val sqlContext = new org.apache.spark.sql.SQLContext(sc)
    val df = sqlContext.jsonFile("temp.txt")
    //df.show()

    val res =  df.withColumn("visited", explode($"visited"))
    val result = res.groupBy($"customerId", $"visited.placeName")
Tried with joins :
val result1 =res.groupBy($"customerId", $"visited.placeName").agg(count("*").alias("total"))
val result2 = res
.filter($"visited.rating" < 4)
  .groupBy($"requestId", $"visited.placeName")  
  .agg(count("*").alias("top"))
result1.show()
result2.show()
percentile = result1.join(result2, List("placeName","customerId"), "outer")
 sqlContext.sql("select top/total as percentile from temp groupBy placeName") 

但是给我错误。

我可以在udf中这样做吗?

 val result1 =  result.withColumn("Top", getCount(res , true))
                    .withColumn("Total",getCount(result, false)).show()

    def getCount(df: DataFrame, flag: Boolean): Int {
            if (flag == "true") return df.filter($"visited.rating" < 3).groupBy($"customerId", $"visited.placeName").agg(count("*"))
            else return  df.agg(count("*"))
          }

My Schema:

 {
        "country": "France",
        "customerId": "France001",
        "visited": [
            {
                "placeName": "US",
                "rating": "2",
                "famousRest": "N/A",
                "placeId": "AVBS34"
            },
              {
                "placeName": "US",
                "rating": "3",
                "famousRest": "SeriousPie",
                "placeId": "VBSs34"
            },
              {
                "placeName": "Canada",
                "rating": "3",
                "famousRest": "TimHortons",
                "placeId": "AVBv4d"
            }        
    ]
}
US top = 1 count = 3
Canada top = 1 count = 3

{
        "country": "Canada",
        "customerId": "Canada012",
        "visited": [
            {
                "placeName": "UK",
                "rating": "3",
                "famousRest": "N/A",
                "placeId": "XSdce2"
            },

    ]
}
UK top = 1 count = 1

{
        "country": "France",
        "customerId": "France001",
        "visited": [
            {
                "placeName": "US",
                "rating": "4.3",
                "famousRest": "N/A",
                "placeId": "AVBS34"
            },
              {
                "placeName": "US",
                "rating": "3.3",
                "famousRest": "SeriousPie",
                "placeId": "VBSs34"
            },
              {
                "placeName": "Canada",
                "rating": "4.3",
                "famousRest": "TimHortons",
                "placeId": "AVBv4d"
            }        
    ]
}
US top = 2 count = 3
Canada top = 1 count = 3

所以最后我需要这样写:

PlaceName  percentile
US         57.14            (1+1+2)/(3+1+3) *100
Canada     33.33            (1+1)/(3+3) *100
UK         100               1*100

模式:

root
|-- country: string(nullable=true)
|-- customerId:string(nullable=true)
|-- visited: array (nullable = true)
|    |-- element: struct (containsNull = true)
|    |   |-- placeId: string (nullable = true)
|    |   |-- placeName: string (nullable = true) 
|    |   |-- famousRest: string (nullable = true)
|    |   |-- rating: string (nullable = true)

给定您提供的代码,不清楚源代码的结构以及为什么会出现此特定错误,但通常该代码甚至不有效。

  • getCount不是UDF -这不是关键但很重要的区别。
  • getCount不是一个有效的函数,因为作用域中没有col类型。除非出于某种原因使用它作为o.a.s.sql.DataFrame的类型别名,否则它甚至无法编译!
  • 即使类型匹配,Spark也不支持嵌套操作/转换,所以你不能使用UDF在Spark DataFrame上执行查询或聚合。

相关内容

  • 没有找到相关文章

最新更新