我在HDF中有结构化的基本文本文件,其中包含这样的数据(在file.txt中):
OgId|^|ItemId|^|segmentId|^|Sequence|^|Action|!|
4295877341|^|136|^|4|^|1|^|I|!|
4295877346|^|136|^|4|^|1|^|I|!|
4295877341|^|138|^|2|^|1|^|I|!|
4295877341|^|141|^|4|^|1|^|I|!|
4295877341|^|143|^|2|^|1|^|I|!|
4295877341|^|145|^|14|^|1|^|I|!|
123456789|^|145|^|14|^|1|^|I|!|
文件大小.txt为 30 GB。
我有增量数据文件1.txt大小约为2 GB,在HFDS中以相同的格式出现,如下所示:
OgId|^|ItemId|^|segmentId|^|Sequence|^|Action|!|
4295877341|^|213|^|4|^|1|^|I|!|
4295877341|^|213|^|4|^|1|^|I|!|
4295877341|^|215|^|2|^|1|^|I|!|
4295877341|^|141|^|4|^|1|^|I|!|
4295877341|^|143|^|2|^|1|^|I|!|
4295877343|^|149|^|14|^|2|^|I|!|
123456789|^|145|^|14|^|1|^|D|!|
现在,我必须将 file.txt 和 file1 结合起来.txt并创建一个包含所有唯一记录的最终文本文件。
这两个文件中的密钥都是 OrgId。如果在第一个文件中找到相同的 OrgId,那么我必须用新的 OrgId 替换,如果没有,那么我必须插入新的 OrgId。
最终输出是这样的。
OgId|^|ItemId|^|segmentId|^|Sequence|^|Action|!|
4295877346|^|136|^|4|^|1|^|I|!|
4295877341|^|213|^|4|^|1|^|I|!|
4295877341|^|215|^|2|^|1|^|I|!|
4295877341|^|141|^|4|^|1|^|I|!|
4295877341|^|143|^|2|^|1|^|I|!|
4295877343|^|149|^|14|^|2|^|I|!|
如何在mapreduce中做到这一点?
我不打算使用 HIVE 解决方案,因为我有很多这样的不同文件,大约 10.000,所以我必须在 HIVE 中创建 10.000 个分区。
有什么建议在这个用例中使用Spark吗?
我建议你为spark
scala
编程。如果你用mapreduce
编程,它只对hadoop
有用,但用scala
编程spark
将使您能够在spark
和hadoop
中进行处理。 启动Spark
是为了处理mapreduce
模型中的缺点。您可以找到有关此主题的许多资源。其中之一是这个
关于您的问题,我建议您使用dataframe
第一个任务是为数据帧创建schema
。
val schema = StructType(Array(StructField("OgId", StringType),
StructField("ItemId", StringType),
StructField("segmentId", StringType),
StructField("Sequence", StringType),
StructField("Action", StringType)))
下一个任务是读取这两个文件并使用上述架构创建数据帧
import org.apache.spark.sql.functions._
val textRdd1 = sparkContext.textFile("input path to file1 in hdfs")
val rowRdd1 = textRdd1.map(line => Row.fromSeq(line.split("\|\^\|", -1)))
var df1 = sqlContext.createDataFrame(rowRdd1, schema)
df1 = df1.withColumn("Action", regexp_replace($"Action", "[|!|]", ""))
val textRdd2 = sparkContext.textFile("input path to file 2 in hdfs")
val rowRdd2 = textRdd2.map(line => Row.fromSeq(line.split("\|\^\|", -1)))
var df2 = sqlContext.createDataFrame(rowRdd2, schema)
df2 = df2.withColumn("Action", regexp_replace($"Action", "[|!|]", ""))
df1
的输出为
+----------+------+---------+--------+------+
|OgId |ItemId|segmentId|Sequence|Action|
+----------+------+---------+--------+------+
|4295877341|136 |4 |1 |I |
|4295877346|136 |4 |1 |I |
|4295877341|138 |2 |1 |I |
|4295877341|141 |4 |1 |I |
|4295877341|143 |2 |1 |I |
|4295877341|145 |14 |1 |I |
+----------+------+---------+--------+------+
df2
的输出是
+----------+------+---------+--------+------+
|OgId |ItemId|segmentId|Sequence|Action|
+----------+------+---------+--------+------+
|4295877341|213 |4 |1 |I |
|4295877341|215 |2 |1 |I |
|4295877341|141 |4 |1 |I |
|4295877341|143 |2 |1 |I |
|4295877343|149 |14 |2 |I |
+----------+------+---------+--------+------+
现在,根据您的要求,如果要从df1
中删除rows
,如果OgId
与df2
匹配,并将所有df2
附加到df1
。这些要求可以按如下方式完成
val tempdf = df2.select("OgId").withColumnRenamed("OgId", "OgId_1")
df1 = df1.join(tempdf, df1("OgId") === tempdf("OgId_1"), "left")
df1 = df1.filter("OgId_1 is null").drop("OgId_1")
df1 = df1.union(df2)
最终输出为
+----------+------+---------+--------+------+
|OgId |ItemId|segmentId|Sequence|Action|
+----------+------+---------+--------+------+
|4295877346|136 |4 |1 |I |
|4295877341|213 |4 |1 |I |
|4295877341|215 |2 |1 |I |
|4295877341|141 |4 |1 |I |
|4295877341|143 |2 |1 |I |
|4295877343|149 |14 |2 |I |
+----------+------+---------+--------+------+
此最终结果可以保存为hdfs
df1.write.format("com.databricks.spark.csv").save("output file path in hdfs")
我希望这是有帮助的
注意:请确保正确写入输入和输出位置的路径