我有两个文件,结构如下
文件1gnk_id, matchId, timestamp
文件2gnk_matchid, matchid
我想更新文件1中gnk_id
的值与文件2中matchid
的值如果file1.gnk_id = file2.gnk_machid
。
为此我在Spark中创建了两个数据帧。我想知道我们是否可以在Spark中更新值?如果没有,是否有任何解决方法可以提供更新的最终文件?
更新我做了这样的事情
case class GnkMatchId(gnk: String, gnk_matchid: String)
case class MatchGroup(gnkid: String, matchid: String, ts: String)
val gnkmatchidRDD = sc.textFile("000000000001").map(_.split(',')).map(x => (x(0),x(1)) )
val gnkmatchidDF = gnkmatchidRDD.map( x => GnkMatchId(x._1,x._2) ).toDF()
val matchGroupMr = sc.textFile("part-00000").map(_.split(',')).map(x => (x(0),x(1),x(2)) ).map( f => MatchGroup(f._1,f._2,f._3.toString) ).toDF()
val matchgrp_joinDF = matchGroupMr.join(gnkmatchidDF,matchGroupMr("gnkid") === gnkmatchidDF("gnk_matchid"),"left_outer")
matchgrp_joinDF.map(x => if(x.getAs[String]("gnk_matchid").length != 0 ) {MatchGroup(x.getAs[String]("gnk_matchid"), x.getAs[String]("matchid"),x.getAs[String]("ts"))} else {MatchGroup(x.getAs[String]("gnkid"), x.getAs[String]("matchid"),x.getAs[String]("ts"))}).toDF().show()
但是最后一步NULLpointerEXception失败
DataFrame
是基于RDD
的,所以你不能更新其中的值。
但是您可以通过添加新列来执行withColumn
更新值。
在您的情况下,您可以通过使用UDF来实现join
和withColumn
:
// df1: your File1
// +------+-------+---+
// |gnk_id|matchId| ts|
// +------+-------+---+
// | 1| 10|100|
// | 2| 20|200|
// +------+-------+---+
// df2: your File2
// +-----------+-------+
// |gnk_matchid|matchid|
// +-----------+-------+
// | 1| 1000|
// | 3| 3000|
// +-----------+-------+
// UDF: choose values from matchid or gnk_id for the new column
val myUDF = udf[Integer,Integer,Integer]((df2_matchid: Integer, df1_gnk_id: Integer) => {
if (df2_matchid == null) df1_gnk_id
else df2_matchid
})
df1.join(df2, $"gnk_id"===$"gnk_matchid", "left_outer")
.select($"df1.*", $"df2.matchid" as "matchid2")
.withColumn("gnk_id", myUDF($"matchid2", $"gnk_id"))
.drop($"matchid2")
.show()
输出如下:+------+-------+---+
|gnk_id|matchId| ts|
+------+-------+---+
| 1000| 10|100|
| 2| 20|200|
+------+-------+---+
这可能是您正在寻找的连接。假设您的数据帧位于file1
和file2
中,您可以尝试以下操作:
val result = file1
.join(file2, file1("matchId") === file2("matchid"))
.select(
col("gnk_matchid").as("gnk_id"),
col("matchId"),
col("timestamp")
)
这取决于你使用的数据源是否支持
- 与文件,你只需要过滤掉的行,做出改变,并将其添加回来。
希望能有所帮助。
2018/04/11更新链接 .
最简单的实现方法,下面的代码读取每个批次的维度数据文件夹,但要记住新的维度数据值(在我的情况下是国家名称)必须是一个新文件。
流+批连接的如下解决方案
package com.databroccoli.streaming.dimensionupateinstreaming
import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.{DataFrame, ForeachWriter, Row, SparkSession}
import org.apache.spark.sql.functions.{broadcast, expr}
import org.apache.spark.sql.types.{StringType, StructField, StructType, TimestampType}
object RefreshDimensionInStreaming {
def main(args: Array[String]) = {
@transient lazy val logger: Logger = Logger.getLogger(getClass.getName)
Logger.getLogger("akka").setLevel(Level.WARN)
Logger.getLogger("org").setLevel(Level.ERROR)
Logger.getLogger("com.amazonaws").setLevel(Level.ERROR)
Logger.getLogger("com.amazon.ws").setLevel(Level.ERROR)
Logger.getLogger("io.netty").setLevel(Level.ERROR)
val spark = SparkSession
.builder()
.master("local")
.getOrCreate()
val schemaUntyped1 = StructType(
Array(
StructField("id", StringType),
StructField("customrid", StringType),
StructField("customername", StringType),
StructField("countrycode", StringType),
StructField("timestamp_column_fin_1", TimestampType)
))
val schemaUntyped2 = StructType(
Array(
StructField("id", StringType),
StructField("countrycode", StringType),
StructField("countryname", StringType),
StructField("timestamp_column_fin_2", TimestampType)
))
val factDf1 = spark.readStream
.schema(schemaUntyped1)
.option("header", "true")
.csv("src/main/resources/broadcasttest/fact")
var countryDf: Option[DataFrame] = None: Option[DataFrame]
def updateDimensionDf() = {
val dimDf2 = spark.read
.schema(schemaUntyped2)
.option("header", "true")
.csv("src/main/resources/broadcasttest/dimension")
if (countryDf != None) {
countryDf.get.unpersist()
}
countryDf = Some(
dimDf2
.withColumnRenamed("id", "id_2")
.withColumnRenamed("countrycode", "countrycode_2"))
countryDf.get.show()
}
factDf1.writeStream
.outputMode("append")
.foreachBatch { (batchDF: DataFrame, batchId: Long) =>
batchDF.show(10)
updateDimensionDf()
batchDF
.join(
countryDf.get,
expr(
"""
countrycode_2 = countrycode
"""
),
"leftOuter"
)
.show
}
.start()
.awaitTermination()
}
}