我想读取文件路径,无论它们是HDFS还是本地。目前,我使用前缀file://传递本地路径,使用前缀HDFS://传递HDFS路径,并编写如下代码
Configuration configuration = new Configuration();
FileSystem fileSystem = null;
if (filePath.startsWith("hdfs://")) {
fileSystem = FileSystem.get(configuration);
} else if (filePath.startsWith("file://")) {
fileSystem = FileSystem.getLocal(configuration).getRawFileSystem();
}
从这里我使用文件系统的API来读取文件。
你能告诉我有没有比这更好的方法吗?
这有意义吗,
public static void main(String[] args) throws IOException {
Configuration conf = new Configuration();
conf.addResource(new Path("/hadoop/projects/hadoop-1.0.4/conf/core-site.xml"));
conf.addResource(new Path("/hadoop/projects/hadoop-1.0.4/conf/hdfs-site.xml"));
BufferedReader br = new BufferedReader(new InputStreamReader(System.in));
System.out.println("Enter the file path...");
String filePath = br.readLine();
Path path = new Path(filePath);
FileSystem fs = path.getFileSystem(conf);
FSDataInputStream inputStream = fs.open(path);
System.out.println(inputStream.available());
fs.close();
}
如果你走这条路,你就不用签支票了。直接从Path获取文件系统,然后做任何您想做的事情。
您可以通过以下方式获得FileSystem
:
Configuration conf = new Configuration();
Path path = new Path(stringPath);
FileSystem fs = FileSystem.get(path.toUri(), conf);
不需要判断路径以hdfs://
开头还是以file://
开头。
请检查下面的代码片段,从HDFS路径中列出文件;即以hdfs://
开头的路径字符串。如果你能提供Hadoop配置和本地路径,它也会列出本地文件系统中的文件;即以file://
开头的路径字符串。
//helper method to get the list of files from the HDFS path
public static List<String> listFilesFromHDFSPath(Configuration hadoopConfiguration, String hdfsPath,
boolean recursive)
{
//resulting list of files
List<String> filePaths = new ArrayList<String>();
FileSystem fs = null;
//try-catch-finally all possible exceptions
try
{
//get path from string and then the filesystem
Path path = new Path(hdfsPath); //throws IllegalArgumentException, all others will only throw IOException
fs = path.getFileSystem(hadoopConfiguration);
//resolve hdfsPath first to check whether the path exists => either a real directory or o real file
//resolvePath() returns fully-qualified variant of the path
path = fs.resolvePath(path);
//if recursive approach is requested
if (recursive)
{
//(heap issues with recursive approach) => using a queue
Queue<Path> fileQueue = new LinkedList<Path>();
//add the obtained path to the queue
fileQueue.add(path);
//while the fileQueue is not empty
while (!fileQueue.isEmpty())
{
//get the file path from queue
Path filePath = fileQueue.remove();
//filePath refers to a file
if (fs.isFile(filePath))
{
filePaths.add(filePath.toString());
}
else //else filePath refers to a directory
{
//list paths in the directory and add to the queue
FileStatus[] fileStatuses = fs.listStatus(filePath);
for (FileStatus fileStatus : fileStatuses)
{
fileQueue.add(fileStatus.getPath());
} // for
} // else
} // while
} // if
else //non-recursive approach => no heap overhead
{
//if the given hdfsPath is actually directory
if (fs.isDirectory(path))
{
FileStatus[] fileStatuses = fs.listStatus(path);
//loop all file statuses
for (FileStatus fileStatus : fileStatuses)
{
//if the given status is a file, then update the resulting list
if (fileStatus.isFile())
filePaths.add(fileStatus.getPath().toString());
} // for
} // if
else //it is a file then
{
//return the one and only file path to the resulting list
filePaths.add(path.toString());
} // else
} // else
} // try
catch(Exception ex) //will catch all exception including IOException and IllegalArgumentException
{
ex.printStackTrace();
//if some problem occurs return an empty array list
return new ArrayList<String>();
} //
finally
{
//close filesystem; not more operations
try
{
if(fs != null)
fs.close();
} catch (IOException e)
{
e.printStackTrace();
} // catch
} // finally
//return the resulting list; list can be empty if given path is an empty directory without files and sub-directories
return filePaths;
} // listFilesFromHDFSPath
如果你真的想使用java.io.File API,那么下面的方法将帮助你只列出本地文件系统中的文件;即以file://
开头的路径字符串。
//helper method to list files from the local path in the local file system
public static List<String> listFilesFromLocalPath(String localPathString, boolean recursive)
{
//resulting list of files
List<String> localFilePaths = new ArrayList<String>();
//get the Java file instance from local path string
File localPath = new File(localPathString);
//this case is possible if the given localPathString does not exit => which means neither file nor a directory
if(!localPath.exists())
{
System.err.println("n" + localPathString + " is neither a file nor a directory; please provide correct local path");
//return with empty list
return new ArrayList<String>();
} // if
//at this point localPath does exist in the file system => either as a directory or a file
//if recursive approach is requested
if (recursive)
{
//recursive approach => using a queue
Queue<File> fileQueue = new LinkedList<File>();
//add the file in obtained path to the queue
fileQueue.add(localPath);
//while the fileQueue is not empty
while (!fileQueue.isEmpty())
{
//get the file from queue
File file = fileQueue.remove();
//file instance refers to a file
if (file.isFile())
{
//update the list with file absolute path
localFilePaths.add(file.getAbsolutePath());
} // if
else //else file instance refers to a directory
{
//list files in the directory and add to the queue
File[] listedFiles = file.listFiles();
for (File listedFile : listedFiles)
{
fileQueue.add(listedFile);
} // for
} // else
} // while
} // if
else //non-recursive approach
{
//if the given localPathString is actually a directory
if (localPath.isDirectory())
{
File[] listedFiles = localPath.listFiles();
//loop all listed files
for (File listedFile : listedFiles)
{
//if the given listedFile is actually a file, then update the resulting list
if (listedFile.isFile())
localFilePaths.add(listedFile.getAbsolutePath());
} // for
} // if
else //it is a file then
{
//return the one and only file absolute path to the resulting list
localFilePaths.add(localPath.getAbsolutePath());
} // else
} // else
//return the resulting list; list can be empty if given path is an empty directory without files and sub-directories
return localFilePaths;
} // listFilesFromLocalPath
这个作品
package com.leerhdfs;
//import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.util.Progressable;
import java.io.*;
import java.net.URI;
import java.nio.charset.StandardCharsets;
public class ReadWriteHDFSExample {
public static void main(String[] args) throws IOException {
Path inFile = new Path(args[0]);
String destinosrc = args[1];
//InputStream in = new BufferedInputStream(new FileInputStream(localsrc));
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(URI.create(destinosrc), conf);
FSDataInputStream in = fs.open(inFile);
//Progressable ir viendo aumento 10%, 20%, 30%
OutputStream out = fs.create(new Path(destinosrc), new Progressable() {
public void progress() {
System.out.println("Leyendo y escribiendo...");
}
});
IOUtils.copyBytes(in ,out, 4096, true);
in.close();
}
}