我在Scala中有两个RDD,并将其转换为数据范围。现在我有两个dataframes.一个prodUniqueDF
,其中有两个名为prodid
和uid
的列,它具有产品的主数据
scala> prodUniqueDF.printSchema
root
|-- prodid: string (nullable = true)
|-- uid: long (nullable = false)
第二, ratingsDF
我的列名为 prodid
, custid
, ratings
scala> ratingsDF.printSchema
root
|-- prodid: string (nullable = true)
|-- custid: string (nullable = true)
|-- ratings: integer (nullable = false)
我想加入上面的两个,然后用ratingsDF
ratingsDF.prodid
中替换CC_8为此,我首先将它们注册为" Temptables"
prodUniqueDF.registerTempTable("prodUniqueDF")
ratingsDF.registerTempTable("ratingsDF")
我运行代码
val testSql = sql("SELECT prodUniqueDF.uid, ratingsDF.custid, ratingsDF.ratings FROM prodUniqueDF, ratingsDF WHERE prodUniqueDF.prodid = ratingsDF.prodid")
但是错误出现为:
org.apache.spark.sql.AnalysisException: Table not found: prodUniqueDF; line 1 pos 66
请帮忙!我该如何达到加入?有其他方法可以映射RDD吗?
可以轻松实现数据范围的连接,格式是
DataFrameA.join(DataFrameB)
默认情况下,它需要一个内在的加入,但是您还可以指定要执行的连接类型,并且它们具有API您可以在这里查找更多信息。
http://spark.apache.org/docs/latest/api/scala/scala/#org.apache.spark.sql.dataframe
用于替换现有列中的值,您可以从API
帮助使用Column方法的帮助这将是这样的:
val newDF = dfA.withColumn("newColumnName", dfB("columnName"))).drop("columnName").withColumnRenamed("newColumnName", "columnName")
我认为这可能会解决问题!