如何使用Scala数据帧更新值



我有两个文件,结构如下

文件1
gnk_id, matchId, timestamp

文件2
gnk_matchid, matchid

我想更新文件1中gnk_id的值与文件2中matchid的值如果file1.gnk_id = file2.gnk_machid

为此我在Spark中创建了两个数据帧。我想知道我们是否可以在Spark中更新值?如果没有,是否有任何解决方法可以提供更新的最终文件?

更新

我做了这样的事情

case class GnkMatchId(gnk: String, gnk_matchid: String)
case class MatchGroup(gnkid: String, matchid: String, ts: String)
val gnkmatchidRDD = sc.textFile("000000000001").map(_.split(',')).map(x => (x(0),x(1)) )
val gnkmatchidDF = gnkmatchidRDD.map( x => GnkMatchId(x._1,x._2) ).toDF()
val matchGroupMr = sc.textFile("part-00000").map(_.split(',')).map(x => (x(0),x(1),x(2)) ).map( f => MatchGroup(f._1,f._2,f._3.toString) ).toDF()
val matchgrp_joinDF = matchGroupMr.join(gnkmatchidDF,matchGroupMr("gnkid") === gnkmatchidDF("gnk_matchid"),"left_outer")
matchgrp_joinDF.map(x => if(x.getAs[String]("gnk_matchid").length != 0 ) {MatchGroup(x.getAs[String]("gnk_matchid"), x.getAs[String]("matchid"),x.getAs[String]("ts"))} else {MatchGroup(x.getAs[String]("gnkid"), x.getAs[String]("matchid"),x.getAs[String]("ts"))}).toDF().show()

但是最后一步NULLpointerEXception失败

DataFrame是基于RDD的,所以你不能更新其中的值。

但是您可以通过添加新列来执行withColumn更新值。

在您的情况下,您可以通过使用UDF来实现joinwithColumn:

// df1: your File1
// +------+-------+---+
// |gnk_id|matchId| ts|
// +------+-------+---+
// |     1|     10|100|
// |     2|     20|200|
// +------+-------+---+
// df2: your File2
// +-----------+-------+
// |gnk_matchid|matchid|
// +-----------+-------+
// |          1|   1000|
// |          3|   3000|
// +-----------+-------+
// UDF: choose values from matchid or gnk_id for the new column
val myUDF = udf[Integer,Integer,Integer]((df2_matchid: Integer, df1_gnk_id: Integer) => {
  if (df2_matchid == null) df1_gnk_id
  else df2_matchid
})
df1.join(df2, $"gnk_id"===$"gnk_matchid", "left_outer")
  .select($"df1.*", $"df2.matchid" as "matchid2")
  .withColumn("gnk_id", myUDF($"matchid2", $"gnk_id"))
  .drop($"matchid2")
  .show()

输出如下:

+------+-------+---+
|gnk_id|matchId| ts|
+------+-------+---+
|  1000|     10|100|
|     2|     20|200|
+------+-------+---+

这可能是您正在寻找的连接。假设您的数据帧位于file1file2中,您可以尝试以下操作:

val result = file1
  .join(file2, file1("matchId") === file2("matchid"))
  .select(
    col("gnk_matchid").as("gnk_id"),
    col("matchId"),
    col("timestamp")
  )

这取决于你使用的数据源是否支持

  • 与文件,你只需要过滤掉的行,做出改变,并将其添加回来。

希望能有所帮助。

2018/04/11更新链接 .

最简单的实现方法,下面的代码读取每个批次的维度数据文件夹,但要记住新的维度数据值(在我的情况下是国家名称)必须是一个新文件。

流+批连接的如下解决方案

package com.databroccoli.streaming.dimensionupateinstreaming
import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.{DataFrame, ForeachWriter, Row, SparkSession}
import org.apache.spark.sql.functions.{broadcast, expr}
import org.apache.spark.sql.types.{StringType, StructField, StructType, TimestampType}
object RefreshDimensionInStreaming {
  def main(args: Array[String]) = {
    @transient lazy val logger: Logger = Logger.getLogger(getClass.getName)
    Logger.getLogger("akka").setLevel(Level.WARN)
    Logger.getLogger("org").setLevel(Level.ERROR)
    Logger.getLogger("com.amazonaws").setLevel(Level.ERROR)
    Logger.getLogger("com.amazon.ws").setLevel(Level.ERROR)
    Logger.getLogger("io.netty").setLevel(Level.ERROR)
    val spark = SparkSession
      .builder()
      .master("local")
      .getOrCreate()
    val schemaUntyped1 = StructType(
      Array(
        StructField("id", StringType),
        StructField("customrid", StringType),
        StructField("customername", StringType),
        StructField("countrycode", StringType),
        StructField("timestamp_column_fin_1", TimestampType)
      ))
    val schemaUntyped2 = StructType(
      Array(
        StructField("id", StringType),
        StructField("countrycode", StringType),
        StructField("countryname", StringType),
        StructField("timestamp_column_fin_2", TimestampType)
      ))
    val factDf1 = spark.readStream
      .schema(schemaUntyped1)
      .option("header", "true")
      .csv("src/main/resources/broadcasttest/fact")
    var countryDf: Option[DataFrame] = None: Option[DataFrame]
    def updateDimensionDf() = {
      val dimDf2 = spark.read
        .schema(schemaUntyped2)
        .option("header", "true")
        .csv("src/main/resources/broadcasttest/dimension")
      if (countryDf != None) {
        countryDf.get.unpersist()
      }
      countryDf = Some(
        dimDf2
          .withColumnRenamed("id", "id_2")
          .withColumnRenamed("countrycode", "countrycode_2"))
      countryDf.get.show()
    }
    factDf1.writeStream
      .outputMode("append")
      .foreachBatch { (batchDF: DataFrame, batchId: Long) =>
        batchDF.show(10)
        updateDimensionDf()
        batchDF
          .join(
            countryDf.get,
            expr(
              """
      countrycode_2 = countrycode 
      """
            ),
            "leftOuter"
          )
          .show
      }
      .start()
      .awaitTermination()
  }
}

相关内容

  • 没有找到相关文章

最新更新