如何处理HDFS hadoop Map-Reduce中的增量更新



我在HDF中有结构化的基本文本文件,其中包含这样的数据(在file.txt中):

OgId|^|ItemId|^|segmentId|^|Sequence|^|Action|!|
4295877341|^|136|^|4|^|1|^|I|!|
4295877346|^|136|^|4|^|1|^|I|!|
4295877341|^|138|^|2|^|1|^|I|!|
4295877341|^|141|^|4|^|1|^|I|!|
4295877341|^|143|^|2|^|1|^|I|!|
4295877341|^|145|^|14|^|1|^|I|!|
123456789|^|145|^|14|^|1|^|I|!|

文件大小.txt为 30 GB。

我有增量数据文件1.txt大小约为2 GB,在HFDS中以相同的格式出现,如下所示:

OgId|^|ItemId|^|segmentId|^|Sequence|^|Action|!|
4295877341|^|213|^|4|^|1|^|I|!|
4295877341|^|213|^|4|^|1|^|I|!|
4295877341|^|215|^|2|^|1|^|I|!|
4295877341|^|141|^|4|^|1|^|I|!|
4295877341|^|143|^|2|^|1|^|I|!|
4295877343|^|149|^|14|^|2|^|I|!|
123456789|^|145|^|14|^|1|^|D|!|

现在,我必须将 file.txt 和 file1 结合起来.txt并创建一个包含所有唯一记录的最终文本文件。

这两个文件中的密钥都是 OrgId。如果在第一个文件中找到相同的 OrgId,那么我必须用新的 OrgId 替换,如果没有,那么我必须插入新的 OrgId。

最终输出是这样的。

OgId|^|ItemId|^|segmentId|^|Sequence|^|Action|!|
4295877346|^|136|^|4|^|1|^|I|!|
4295877341|^|213|^|4|^|1|^|I|!|
4295877341|^|215|^|2|^|1|^|I|!|
4295877341|^|141|^|4|^|1|^|I|!|
4295877341|^|143|^|2|^|1|^|I|!|
4295877343|^|149|^|14|^|2|^|I|!|

如何在mapreduce中做到这一点?

我不打算使用 HIVE 解决方案,因为我有很多这样的不同文件,大约 10.000,所以我必须在 HIVE 中创建 10.000 个分区。

有什么建议在这个用例中使用Spark吗?

我建议你为sparkscala编程。如果你用mapreduce编程,它只对hadoop有用,但用scala编程spark将使您能够在sparkhadoop中进行处理。 启动Spark是为了处理mapreduce模型中的缺点。您可以找到有关此主题的许多资源。其中之一是这个

关于您的问题,我建议您使用dataframe

第一个任务是为数据帧创建schema

val schema = StructType(Array(StructField("OgId", StringType),
StructField("ItemId", StringType),
StructField("segmentId", StringType),
StructField("Sequence", StringType),
StructField("Action", StringType)))

下一个任务是读取这两个文件并使用上述架构创建数据帧

import org.apache.spark.sql.functions._
val textRdd1 = sparkContext.textFile("input path to file1 in hdfs")
val rowRdd1 = textRdd1.map(line => Row.fromSeq(line.split("\|\^\|", -1)))
var df1 = sqlContext.createDataFrame(rowRdd1, schema)
df1 = df1.withColumn("Action", regexp_replace($"Action", "[|!|]", ""))
val textRdd2 = sparkContext.textFile("input path to file 2 in hdfs")
val rowRdd2 = textRdd2.map(line => Row.fromSeq(line.split("\|\^\|", -1)))
var df2 = sqlContext.createDataFrame(rowRdd2, schema)
df2 = df2.withColumn("Action", regexp_replace($"Action", "[|!|]", ""))

df1的输出为

+----------+------+---------+--------+------+
|OgId      |ItemId|segmentId|Sequence|Action|
+----------+------+---------+--------+------+
|4295877341|136   |4        |1       |I     |
|4295877346|136   |4        |1       |I     |
|4295877341|138   |2        |1       |I     |
|4295877341|141   |4        |1       |I     |
|4295877341|143   |2        |1       |I     |
|4295877341|145   |14       |1       |I     |
+----------+------+---------+--------+------+

df2的输出是

+----------+------+---------+--------+------+
|OgId      |ItemId|segmentId|Sequence|Action|
+----------+------+---------+--------+------+
|4295877341|213   |4        |1       |I     |
|4295877341|215   |2        |1       |I     |
|4295877341|141   |4        |1       |I     |
|4295877341|143   |2        |1       |I     |
|4295877343|149   |14       |2       |I     |
+----------+------+---------+--------+------+

现在,根据您的要求,如果要从df1中删除rows,如果OgIddf2匹配,并将所有df2附加到df1。这些要求可以按如下方式完成

val tempdf = df2.select("OgId").withColumnRenamed("OgId", "OgId_1")
df1 = df1.join(tempdf, df1("OgId") === tempdf("OgId_1"), "left")
df1 = df1.filter("OgId_1 is null").drop("OgId_1")
df1 = df1.union(df2)

最终输出为

+----------+------+---------+--------+------+
|OgId      |ItemId|segmentId|Sequence|Action|
+----------+------+---------+--------+------+
|4295877346|136   |4        |1       |I     |
|4295877341|213   |4        |1       |I     |
|4295877341|215   |2        |1       |I     |
|4295877341|141   |4        |1       |I     |
|4295877341|143   |2        |1       |I     |
|4295877343|149   |14       |2       |I     |
+----------+------+---------+--------+------+

此最终结果可以保存为hdfs

df1.write.format("com.databricks.spark.csv").save("output file path in hdfs")

我希望这是有帮助的

注意:请确保正确写入输入和输出位置的路径

最新更新