HDFS使用多线程读取



我正在使用生产者-消费者模型,利用BlockingQueue,使用多线程从HDFS目录读取文件。

这是我的密码;

生产者类别:

public void readURLS() {
final int capacity = Integer.MAX_VALUE;
BlockingQueue<String> queue = new LinkedBlockingQueue<>(capacity);
try {
FileSystem hdfs = FileSystem.get(hadoopConf);
FileStatus[] status = hdfs.listStatus(new Path("MYHDFS_PATH"));
int i = 0;
for (FileStatus file : status) {
LOG.info("Thread {} started: ", i++);
LOG.info("Reading file {} ", file.getPath().getName());
new Thread(new FetchData(queue, file.getPath(), hadoopConf)).start();
}
} catch (IOException e) {
LOG.error("IOException occured while listing files from HDFS directory");
}
}

获取数据:

@Override
public void run() {
LOG.info("Inside reader to start reading the files ");
try (BufferedReader bufferedReader =
new BufferedReader(new InputStreamReader
(FileSystem.get(hadoopConf).open(file), StandardCharsets.UTF_8))) {

String line;
while ((line = bufferedReader.readLine()) != null) {
if (Thread.interrupted()) {
throw new InterruptedException();
}
LOG.info("Line is :{}", line);
queue.put(line);
}

} catch (IOException e) {
LOG.error("file : {} ", file.toString());
throw new IOException(e);
} catch (InterruptedException e) {
LOG.error("An error has occurred: ", e);
Thread.currentThread().interrupt();
}

在执行代码时,它向我抛出InterruptedIOException:

java.io.IOException: Failed on local exception: java.io.**InterruptedIOException**: Interruped while waiting for IO on channel java.nio.channels.SocketChannel[connected 

知道为什么吗。我的想法是在每个文件上循环,并使用一个单独的线程读取每个文件。

当从多个(许多!(线程使用HDFS时,我也会得到相同的行为,并且不知道问题的答案"为什么&";,但是保持同时访问HDFS的线程数量似乎有帮助。

在您的情况下,我建议使用线程数量有限的ExecutorService,并在没有出现异常时将该数量微调到极限。

因此,创建ExecutorService(以10个线程为起点(:

final ExecutorService executorService = Executors.newFixedThreadPool(10);

而不是你的

new Thread(new FetchData(queue, file.getPath(), hadoopConf)).start();

进行

executorService.submit(new FetchData(queue, file.getPath(), hadoopConf));

另一个改进是,由于org.apache.hadoop.fs.FileSystem实现了Closeable,您应该关闭它。在您的代码中,每个线程都会创建一个FileSystem的新实例,但不会关闭它。所以我会将它提取到try:中的一个变量中

try (FileSystem fileSystem = FileSystem.get(hadoopConf);
BufferedReader bufferedReader =
new BufferedReader(new InputStreamReader
(fileSystem.open(file), StandardCharsets.UTF_8))) {

更新:

尽管上面的代码似乎是Closeable对象的正确方法,但默认情况下,FileSystem.get将从返回缓存的实例

/** FileSystem cache */
static final Cache CACHE = new Cache();

因此当CCD_ 7将被调用时,事情将可怕地破裂。

您可以通过将fs.hdfs.impl.disable.cache配置参数设置为true来禁用FileSystem缓存,也可以确保FileSystem实例仅在所有工作进程完成时关闭。似乎您也可以只为所有工作人员使用一个FileSystem实例,尽管我在javadocs中找不到任何确认,即在没有额外同步的情况下,这将正常工作。

最新更新