我必须比较CSV文件,然后我必须删除所有重复的行。所以,我的情况就像我有一个文件夹,我必须把每个过滤结果放在那个文件夹中,当一些新文件出现时,我必须将文件夹中的现有文件与新文件进行比较,最后,我必须将结果放回同一个文件夹。
eg: /data/ingestion/file1.csv a1 b1 c1 a2 b2 c2 a3 b3 c3 /data/ingestion/file2.csv a4 b4 c4 a5 b5 c5 a6 b6 c6 new upcoming file(upcoming_file.csv): a1 b1 c1 a5 b5 c5 a7 b7 c7
现在我的方法是从/data/ingestion/* 中存在的所有文件创建一个数据帧。然后创建一个upcoming_file.csv数据帧,并使用联合操作追加这两个数据帧。最后,应用不同的转换。 现在我必须把它写回/data/ingestion,确保那里没有重复性。因此,我选择覆盖操作。
deleted_duplicate.write
.format("csv")
.mode("overwrite")
.save("hdfs://localhost:8020/data/ingestion/")
然后我最终删除了文件夹/data/ingestion中的所有内容。 甚至新数据帧也不会写入 CSV 文件。
我也尝试了其他选择,但我没有实现我上面解释的!
提前感谢!
我建议将输出写入 hdfs 上的新目录 - 如果处理失败,您将始终能够丢弃已处理的任何内容并使用原始数据从头开始处理 - 这是安全且简单的。 :)
处理完成后 - 只需删除旧的并将新的重命名为旧的名称。
更新:
deleted_duplicate.write
.format("csv")
.mode("overwrite")
.save("hdfs://localhost:8020/data/ingestion_tmp/")
Configuration conf = new Configuration();
conf.set("fs.hdfs.impl",org.apache.hadoop.hdfs.DistributedFileSystem.class.getName());
conf.set("fs.file.impl",org.apache.hadoop.fs.LocalFileSystem.class.getName());
FileSystem hdfs = FileSystem.get(URI.create("hdfs://<namenode-hostname>:<port>"), conf);
hdfs.delete("hdfs://localhost:8020/data/ingestion", isRecusrive);
hdfs.rename("hdfs://localhost:8020/data/ingestion_tmp", "hdfs://localhost:8020/data/ingestion");
这是HDFS文件系统API文档的链接