我正在编写一个程序,每天从一个特定的数据库将我们所有的配置单元表上传到s3。然而,这个数据库包含许多年前的记录,对于完整的副本/distcp来说太大了。
我想在HDFS中搜索包含数据库的整个目录,只获取last_modified_date在指定(输入(日期之后的文件。
然后,我将对s3进行这些匹配文件的完整distcp。(如果我只需要将匹配文件的路径/名称复制到一个单独的文件中,然后从这个额外的文件中进行distcp,那也没关系。(
在网上查看时,我发现我可以使用-t
标志按文件上次修改的日期对文件进行排序,所以我开始使用这样的方法:hdfs dfs -ls -R -t <path_to_db>
,但这还不够。它打印了大约500000个文件,我仍然需要弄清楚如何修剪这个输入日期之前的文件。。。
编辑:我正在写一个Python脚本,很抱歉最初没有澄清!
EDIT pt2:我应该注意,我需要遍历几千个,甚至几十万个文件。为了解决我的问题,我写了一个基本的脚本,但运行起来需要非常长的时间。需要一种加快流程的方法。。。。
我不确定您是否使用Java,但这里有一个可以做的示例:。为了使用lastModified,我做了一些小修改。
import java.io.*; import java.util.*; import java.net.*; import org.apache.hadoop.fs.*; import org.apache.hadoop.conf.*; import org.apache.hadoop.io.*; import org.apache.hadoop.mapred.*; import org.apache.hadoop.util.*; // For Date Conversion from long to human readable. import java.text.DateFormat; import java.text.SimpleDateFormat; import java.util.Calendar; import java.util.Date; public class FileStatusChecker { public static void main (String [] args) throws Exception { try{ FileSystem fs = FileSystem.get(new Configuration()); String hdfsFilePath = "hdfs://My-NN-HA/Demos/SparkDemos/inputFile.txt"; FileStatus[] status = fs.listStatus(new Path(hdfsFilePath)); // you need to pass in your hdfs path for (int i=0;i<status.length;i++){ long lastModifiedTimeLong = status[i].lastModified(); Date lastModifiedTimeDate = new Date(lastModifiedTimeLong); DateFormat df = new SimpleDateFormat("EEE, d MMM yyyy HH:mm:ss Z"); System.out.println("The file '"+ hdfsFilePath + "' was accessed last at: "+ df.format(lastModifiedTimeDate)); } }catch(Exception e){ System.out.println("File not found"); e.printStackTrace(); } } }
它将使您能够创建一个文件列表;事物;和他们在一起。
您可以使用WebHDFS提取完全相同的信息:https://hadoop.apache.org/docs/stable/hadoop-project-dist/hadoop-hdfs/WebHDFS.html
这可能与Python一起使用更友好。
示例:
文件/目录的状态提交HTTP GET请求。
curl -i "http://<HOST>:<PORT>/webhdfs/v1/<PATH>?op=GETFILESTATUS" The client receives a response with a FileStatus JSON object: HTTP/1.1 200 OK Content-Type: application/json Transfer-Encoding: chunked { "FileStatus": { "accessTime" : 0, "blockSize" : 0, "group" : "supergroup", "length" : 0, //in bytes, zero for directories "modificationTime": 1320173277227, "owner" : "webuser", "pathSuffix" : "", "permission" : "777", "replication" : 0, "type" : "DIRECTORY" //enum {FILE, DIRECTORY} } }
列出目录提交HTTP GET请求。
curl -i "http://<HOST>:<PORT>/webhdfs/v1/<PATH>?op=LISTSTATUS" The client receives a response with a FileStatuses JSON object: HTTP/1.1 200 OK Content-Type: application/json Content-Length: 427 { "FileStatuses": { "FileStatus": [ { "accessTime" : 1320171722771, "blockSize" : 33554432, "group" : "supergroup", "length" : 24930, "modificationTime": 1320171722771, "owner" : "webuser", "pathSuffix" : "a.patch", "permission" : "644", "replication" : 1, "type" : "FILE" }, { "accessTime" : 0, "blockSize" : 0, "group" : "supergroup", "length" : 0, "modificationTime": 1320895981256, "owner" : "szetszwo", "pathSuffix" : "bar", "permission" : "711", "replication" : 0, "type" : "DIRECTORY" }, ... ] } }