Spark Couchbase Connector语言 - N1QL RDD to DataFrame



im尝试将RDD表单couchbase转换为数据框架(Scala 2.11-和Spark 2.1(,但要获得超载错误,我的代码在下面,有任何想法吗?另一个线程并没有完全回答。

IM在Databricks笔记本中进行此操作,然后我将沙发连接器用于纯数据范围,但是如果我想进行客户n1ql查询,有时会有更多的定制,这是我可以使用的最好的,请先使用RDDS?

首先有更好的方法可以在本机数据框架中执行此查询?我认为我需要使用N1QL和RDD,还是在这里错过了一些东西?

请让我知道我在下面的RDD转换代码上做错了什么,我还会得到:84:错误:超载方法值创建了带有替代方案:错误....谢谢!

val reconciliationSchema = 
   new StructType()
      .add("numEvents", IntegerType)
      .add("eventCategory", StringType)
      .add("eventName", StringType)
val orderEventsCouchbaseQuery = """
  SELECT 
    count(*) as numEvents, event.eventCategory, event.eventName
  FROM 
    events
  WHERE 
    STR_TO_UTC(event.eventOccurredTime)
      BETWEEN STR_TO_UTC("2017-06-16") AND STR_TO_UTC("2017-06-26")
  GROUP BY event.eventCategory, event.eventName
  order by event.eventCategory, event.eventName
"""
val queryResultRDD = sc.couchbaseQuery(N1qlQuery.simple(orderEventsCouchbaseQuery),"events").map(_.value)
val queryResultDF: DataFrame = spark.createDataFrame(queryResultRDD,reconciliationSchema)
display(queryResultDF)

我认为您遇到的问题不是与Couchbase相关的问题,而是Spark/Scala类型推理问题。当您使用createDataFrame时,在这种情况下,Spark需要与Row一起使用,而不是与该rdd的Couchbase查询的返回类型一起使用。

因此,这里有一些类似的示例代码,您可以看到当变成一行时,它可以正常工作:

val query = N1qlQuery.simple("" +
      "select country, count(*) as count " +
      "from `travel-sample` " +
      "where type = 'airport' " +
      "group by country " +
      "order by count desc")
val schema = StructType(
        StructField("count", IntegerType) ::
        StructField("country", StringType) :: Nil
    )
val rdd = spark.sparkContext.couchbaseQuery(query).map(r => Row(r.value.getInt("count"), r.value.getString("country")))
spark.createDataFrame(rdd, schema).show()

最新更新