使用Spark DataFrame/DataSet/RDD更新内在加入



我正在转换MS SQL Server查询的逻辑引发。要转换的thge查询如下:

Update enc set PrUid=m.OriginalPrUid
FROM CachePatDemo enc 
inner join #MergePreMap m on enc.PrUid=m.NewPrUid
WHERE StatusId is null

我正在使用数据框进行转换,并且在两个数据范围内有两个表格,我将以内在的连接来加入。我需要找到一种方法来获取表1的所有列和更新的列(这两个表都是Commonin(。

我尝试使用以下方式:

val result = CachePatDemo.as("df123").
  join(MergePreMap.as("df321"), CachePatDemo("prUid") === MergePreMap("prUid"),"inner").where("StatusId is null")
  select($"df123.pId", 
         $"df321.provFname".as("firstName"), 
         $"df123.lastName", 
         $"df123.prUid")

它似乎无法解决我的问题。任何人都可以帮忙吗?

spark 2.1这可以工作

case class TestModel(x1: Int, x2: String, x3: Int)
object JoinDataFrames extends App {
  import org.apache.spark.sql.{DataFrame, SparkSession}
  val spark = SparkSession.builder.appName("GroupOperations").master("local[2]").enableHiveSupport.getOrCreate
  import spark.implicits._
  import org.apache.spark.sql.functions._
  val list1 = (3 to 10).toList.map(i => new TestModel(i, "This is df1 " + i, i * 3))
  val list2 = (0 to 5).toList.map(i => new TestModel(i, "This is df2 " + i, i * 13))
  val df1: DataFrame = spark.sqlContext.createDataFrame[TestModel](list1)
  val df2: DataFrame = spark.sqlContext.createDataFrame[TestModel](list2)
  val res = df1.join(df2, Seq("x1"), "inner")
  println("from DF1")
  res.select(df1("x2")).show()
  println("from DF2")
  res.select(df2("x2")).show()
}

相关内容

  • 没有找到相关文章

最新更新