如何在特定日期后抓取所有配置单元文件进行s3上传(python)



我正在编写一个程序,每天从一个特定的数据库将我们所有的配置单元表上传到s3。然而,这个数据库包含许多年前的记录,对于完整的副本/distcp来说太大了。

我想在HDFS中搜索包含数据库的整个目录,只获取last_modified_date在指定(输入(日期之后的文件

然后,我将对s3进行这些匹配文件的完整distcp。(如果我只需要将匹配文件的路径/名称复制到一个单独的文件中,然后从这个额外的文件中进行distcp,那也没关系。(

在网上查看时,我发现我可以使用-t标志按文件上次修改的日期对文件进行排序,所以我开始使用这样的方法:hdfs dfs -ls -R -t <path_to_db>,但这还不够。它打印了大约500000个文件,我仍然需要弄清楚如何修剪这个输入日期之前的文件。。。

编辑:我正在写一个Python脚本,很抱歉最初没有澄清!

EDIT pt2:我应该注意,我需要遍历几千个,甚至几十万个文件。为了解决我的问题,我写了一个基本的脚本,但运行起来需要非常长的时间。需要一种加快流程的方法。。。。

我不确定您是否使用Java,但这里有一个可以做的示例:。为了使用lastModified,我做了一些小修改。

import java.io.*;
import java.util.*;
import java.net.*;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.util.*;

// For Date Conversion from long to human readable.
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.Calendar;
import java.util.Date;
public class FileStatusChecker {
public static void main (String [] args) throws Exception {
try{
FileSystem fs = FileSystem.get(new Configuration());
String hdfsFilePath = "hdfs://My-NN-HA/Demos/SparkDemos/inputFile.txt";
FileStatus[] status = fs.listStatus(new Path(hdfsFilePath));  // you need to pass in your hdfs path
for (int i=0;i<status.length;i++){
long lastModifiedTimeLong = status[i].lastModified();
Date lastModifiedTimeDate = new Date(lastModifiedTimeLong);
DateFormat df = new SimpleDateFormat("EEE, d MMM yyyy HH:mm:ss Z");
System.out.println("The file '"+ hdfsFilePath + "' was accessed last at: "+ df.format(lastModifiedTimeDate));
}
}catch(Exception e){
System.out.println("File not found");
e.printStackTrace();
}
}
}

它将使您能够创建一个文件列表;事物;和他们在一起。

您可以使用WebHDFS提取完全相同的信息:https://hadoop.apache.org/docs/stable/hadoop-project-dist/hadoop-hdfs/WebHDFS.html

这可能与Python一起使用更友好。

示例:

文件/目录的状态提交HTTP GET请求。

curl -i  "http://<HOST>:<PORT>/webhdfs/v1/<PATH>?op=GETFILESTATUS"
The client receives a response with a FileStatus JSON object:
HTTP/1.1 200 OK
Content-Type: application/json
Transfer-Encoding: chunked
{
"FileStatus":
{
"accessTime"      : 0,
"blockSize"       : 0,
"group"           : "supergroup",
"length"          : 0,             //in bytes, zero for directories
"modificationTime": 1320173277227,
"owner"           : "webuser",
"pathSuffix"      : "",
"permission"      : "777",
"replication"     : 0,
"type"            : "DIRECTORY"    //enum {FILE, DIRECTORY}
}
}

列出目录提交HTTP GET请求。

curl -i  "http://<HOST>:<PORT>/webhdfs/v1/<PATH>?op=LISTSTATUS"
The client receives a response with a FileStatuses JSON object:
HTTP/1.1 200 OK
Content-Type: application/json
Content-Length: 427
{
"FileStatuses":
{
"FileStatus":
[
{
"accessTime"      : 1320171722771,
"blockSize"       : 33554432,
"group"           : "supergroup",
"length"          : 24930,
"modificationTime": 1320171722771,
"owner"           : "webuser",
"pathSuffix"      : "a.patch",
"permission"      : "644",
"replication"     : 1,
"type"            : "FILE"
},
{
"accessTime"      : 0,
"blockSize"       : 0,
"group"           : "supergroup",
"length"          : 0,
"modificationTime": 1320895981256,
"owner"           : "szetszwo",
"pathSuffix"      : "bar",
"permission"      : "711",
"replication"     : 0,
"type"            : "DIRECTORY"
},
...
]
}
}

相关内容

  • 没有找到相关文章

最新更新