我有两个我从两个文本文件创建的数据帧。DF1在这里:
+--------------------+----------+------+---------+--------+------+
| EntryDate| OgId|ItemId|segmentId|Sequence|Action|
+--------------------+----------+------+---------+--------+------+
|2017-06-07T09:04:...|4295877341| 136| 4| 1| I|!||
|2017-06-07T09:04:...|4295877346| 136| 4| 1| I|!||
|2017-06-07T09:04:...|4295877341| 138| 2| 1| I|!||
|2017-06-07T09:04:...|4295877341| 141| 4| 1| I|!||
|2017-06-07T09:04:...|4295877341| 143| 2| 1| I|!||
|2017-06-07T09:04:...|4295877341| 145| 14| 1| I|!||
|2017-06-07T09:04:...| 123456789| 145| 14| 1| I|!||
+--------------------+----------+------+---------+--------+------+
df2在这里:
+--------------------+----------+------+---------+--------+------+
| EntryDate value| OgId|ItemId|segmentId|Sequence|Action|
+--------------------+----------+------+---------+--------+------+
|2017-06-07T09:04:...|4295877341| 136| 4| 1| I|!||
|2017-06-07T09:05:...|4295877341| 136| 5| 2| I|!||
|2017-06-07T09:06:...|4295877341| 138| 4| 5| I|!||
|2017-06-07T09:07:...|4295877341| 141| 9| 1| I|!||
|2017-06-07T09:08:...|4295877341| 143| null| 2| I|!||
|2017-06-07T09:09:...|4295877343| 149| 14| 2| I|!||
|2017-06-07T09:10:...| 123456789| 145| 14| 1| D|!||
+--------------------+----------+------+---------+--------+------+
现在,我必须以最终数据框架具有唯一记录的方式加入这两个数据框架。
同样,如果DF2具有任何列值null,则DF1相应的值应在最终输出
中这里的操作标签'u'for Update'd'用于删除。
我的最终输出文件应该像以下
+----------+------+---------+--------+------+
| OgId|ItemId|segmentId|Sequence|Action|
+----------+------+---------+--------+------+
|4295877341| 136| 5| 2| I|!||
|4295877346| 136| 4| 1| I|!||
|4295877341| 138| 4| 5| I|!||
|4295877341| 141| 9| 1| I|!||
|4295877341| 143| 2| 2| I|!||
|4295877341| 145| 14| 1| I|!||
|4295877343| 149| 14| 2| I|!||
+----------+------+---------+--------+------+
我两个数据框架的主要键是ogid itemid。
这是我从答案之一中得到的
val tempdf = df2.select("OgId").withColumnRenamed("OgId", "OgId_1")
df1 = df1.join(tempdf, df1("OgId") === tempdf("OgId_1"), "left")
df1 = df1.filter("OgId_1 is null").drop("OgId_1")
df1 = df1.unionAll(df2).distinct()
df1.show()
,但我想按进入日期的顺序更新DF2。
例如4295877341 |136有两个更新,因此DF2将以与DF2相同的数据顺序进行更新。
这是因为某个行更新的某个时候首先发生。然后删除。
最后,如果动作为" D",则DF1的行将被删除,这也应该以适当的顺序进行。
我希望我的问题清楚。
更新建议的答案代码..
package sparkSql
import org.apache.spark.SparkConf
import org.apache.spark._
import org.apache.spark.sql.SQLContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql._
object PcfpDiff {
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("WordCount").setMaster("local[*]"); //Creating spark configuration
// val conf = new SparkConf().setAppName("WordCount");
conf.set("spark.shuffle.blockTransferService", "nio")
val sc = new SparkContext(conf); //Creating spark context
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext.implicits._
import org.apache.spark.{ SparkConf, SparkContext }
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{ StructType, StructField, StringType, DoubleType, IntegerType }
import org.apache.spark.sql.functions.udf
val schema = StructType(Array(
StructField("OrgId", StringType),
StructField("ItemId", StringType),
StructField("segmentId", StringType),
StructField("Sequence", StringType),
StructField("Action", StringType)))
import org.apache.spark.sql.functions._
val textRdd1 = sc.textFile("/home/cloudera/TRF/pcfp/Text1.txt")
val rowRdd1 = textRdd1.map(line => Row.fromSeq(line.split("\|\^\|", -1)))
var df1 = sqlContext.createDataFrame(rowRdd1, schema)
val textRdd2 = sc.textFile("/home/cloudera/TRF/pcfp/Text2.txt")
val rowRdd2 = textRdd2.map(line => Row.fromSeq(line.split("\|\^\|", -1)))
var df2 = sqlContext.createDataFrame(rowRdd2, schema)
val tempdf2 = df2.withColumnRenamed("segmentId", "segmentId_1").withColumnRenamed("Sequence", "Sequence_1").withColumnRenamed("Action", "Action_1")
df1.join(tempdf2, Seq("OrgId", "ItemId"), "outer")
.select($"OrgId", $"ItemId",
when($"segmentId_1".isNotNull, $"segmentId_1").otherwise($"segmentId").as("segmentId"),
when($"Sequence_1".isNotNull, $"Sequence_1").otherwise($"Sequence").as("Sequence"),
when($"Action_1".isNotNull, $"Action_1").otherwise($"Action").as("Action"))
df1.show()
}
}
我要低于输出...分段和sequenceId没有更新。
+----------+------+---------+--------+------+
|4295877341| 136| 4| 1| I|!||
|4295877346| 136| 4| 1| I|!||
|4295877341| 138| 2| 1| I|!||
|4295877341| 141| 4| 1| I|!||
|4295877341| 143| 2| 1| I|!||
|4295877341| 145| 14| 1| I|!||
| 123456789| 145| 14| 1| I|!||
+----------+------+---------+--------+------+
数据集1
4295877341|^|136|^|4|^|1|^|I|!|
4295877346|^|136|^|4|^|1|^|I|!|
4295877341|^|138|^|2|^|1|^|I|!|
4295877341|^|141|^|4|^|1|^|I|!|
4295877341|^|143|^|2|^|1|^|I|!|
4295877341|^|145|^|14|^|1|^|I|!|
123456789|^|145|^|14|^|1|^|I|!|
数据集2
4295877341|^|136|^|4|^|1|^|I|!|
4295877341|^|136|^|5|^|2|^|I|!|
4295877341|^|138|^|4|^|5|^|I|!|
4295877341|^|141|^|9|^|1|^|I|!|
4295877341|^|143|^|null|^|2|^|I|!|
4295877343|^|149|^|14|^|2|^|I|!|
123456789|^|145|^|14|^|1|^|D|!|
这是我为您提供的工作解决方案
val tempdf2 = df2.except(df1).withColumnRenamed("segmentId", "segmentId_1")
.withColumnRenamed("Sequence", "Sequence_1")
.withColumnRenamed("Action", "Action_1")
val df3 = df1.join(tempdf2, Seq("OrgId", "ItemId"), "outer")
.select($"OrgId", $"ItemId",
when($"segmentId_1" =!= "null", $"segmentId_1").otherwise($"segmentId").as("segmentId"),
when($"Sequence_1" =!= "null", $"Sequence_1").otherwise($"Sequence").as("Sequence"),
when($"Action_1" =!= "null", $"Action_1").otherwise($"Action").as("Action"))
.filter(!$"Action".contains("D"))
df3.show()
我希望答案很有帮助,如果没有,您可以采取想法,根据您的需要进行修改。