使用python从HDFS获取文件名列表



Hadoop新手。

我已经搜索了一些关于如何开始使用hadoop和python的教程,但没有多少成功。我还不需要对映射器和reducer做任何工作,但这更多的是一个访问问题。

作为Hadoop集群的一部分,HDFS上有一堆。dat文件。

为了在我的客户端(本地计算机)上使用Python访问这些文件,

我需要在我的电脑上安装什么?

如何查询HDFS上的文件名

任何链接也会有帮助。

据我所知,没有开箱即用的解决方案,我发现的大多数答案都诉诸于调用hdfs命令。我在Linux上运行,遇到了同样的挑战。我发现sh包很有用。这将为您处理运行o/s命令和管理stdin/out/err。

更多信息请看这里:https://amoffat.github.io/sh/

不是最简洁的解决方案,但它只有一行,并且使用了标准的包。

下面是获取HDFS目录清单的简化代码。它将列出类似的文件和文件夹,因此如果您需要区分它们,可能需要修改。

import sh
hdfsdir = '/somedirectory'
filelist = [ line.rsplit(None,1)[-1] for line in sh.hdfs('dfs','-ls',hdfsdir).split('n') if len(line.rsplit(None,1))][1:]

我的输出-在这种情况下,这些都是目录:

[u'/somedirectory/transaction_basket_fct/date_id=2015-01-01',
 u'/somedirectory/transaction_basket_fct/date_id=2015-01-02',
 u'/somedirectory/transaction_basket_fct/date_id=2015-01-03',
 u'/somedirectory/transaction_basket_fct/date_id=2015-01-04',
 u'/somedirectory/transaction_basket_fct/date_id=2015-01-05',
 u'/somedirectory/transaction_basket_fct/date_id=2015-01-06',
 u'/somedirectory/transaction_basket_fct/date_id=2015-01-07',
 u'/somedirectory/transaction_basket_fct/date_id=2015-01-08']

让我们来分析一下:

要运行hdfs dfs -ls /somedirectory命令,我们可以像这样使用sh包:

import sh
sh.hdfs('dfs','-ls',hdfsdir)

sh允许您无缝地调用o/s命令,就好像它们是模块上的函数一样。将命令参数作为函数参数传递。很整洁。

对我来说,这返回类似于:

Found 366 items
drwxrwx---+  - impala hive          0 2016-05-10 13:52 /somedirectory/transaction_basket_fct/date_id=2015-01-01
drwxrwx---+  - impala hive          0 2016-05-10 13:52 /somedirectory/transaction_basket_fct/date_id=2015-01-02
drwxrwx---+  - impala hive          0 2016-05-10 13:52 /somedirectory/transaction_basket_fct/date_id=2015-01-03
drwxrwx---+  - impala hive          0 2016-05-10 13:52 /somedirectory/transaction_basket_fct/date_id=2015-01-04
drwxrwx---+  - impala hive          0 2016-05-10 13:52 /somedirectory/transaction_basket_fct/date_id=2015-01-05

使用.split('n')

根据新的行字符将其分成几行

使用line.rsplit(None,1)[-1]获取字符串中的最后一个'word'

使用if len(line.rsplit(None,1))

来防止列表中出现空元素的问题

最后删除列表中的第一个元素(Found 366 items)使用[1:]

对于"查询HDFS上的文件名"使用python 3的原始子进程库:

from subprocess import Popen, PIPE
hdfs_path = '/path/to/the/designated/folder'
process = Popen(f'hdfs dfs -ls -h {hdfs_path}', shell=True, stdout=PIPE, stderr=PIPE)
std_out, std_err = process.communicate()
list_of_file_names = [fn.split(' ')[-1].split('/')[-1] for fn in std_out.decode().split('n')[1:]][:-1]
list_of_file_names_with_full_address = [fn.split(' ')[-1] for fn in std_out.decode().split('n')[1:]][:-1]

我需要在我的电脑上安装什么?

你需要安装并运行Hadoop,当然还有Python。

如何查询HDFS上的文件名

你可以在这里试一下。我还没有测试过代码,所以不要依赖它。

from subprocess import Popen, PIPE
process = Popen('hdfs dfs -cat filename.dat',shell=True,stdout=PIPE, stderr=PIPE)
std_out, std_err = process.communicate()
check for returncode, std_err
if:
    everything is OK, do whatever with stdout
else:
    do something in else condition

你也可以看看py,这是一个Hadoop的Python API。

虽然我的例子包括shell=true,你可以尝试不运行它,因为它是一个安全风险。为什么不应该使用shell=True ?

您应该可以登录到集群中的一个节点。让集群管理员选择节点并设置帐户,并告诉您如何安全地访问节点。如果您是管理员,请让我知道群集是本地的还是远程的,如果是远程的,那么它是托管在您的计算机上,在公司内部还是在第三方云上,如果是,谁,然后我可以提供更多相关信息。

要查询HDFS中的文件名,请登录集群节点,执行hadoop fs -ls [path]路径是可选的,如果没有提供,则列出主目录中的文件。如果-R作为一个选项提供,那么它将递归地列出path中的所有文件。该命令还有其他选项。有关此命令和其他Hadoop文件系统shell命令的更多信息,请参见http://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-common/FileSystemShell.html.

在Python中查询HDFS文件名的一种简单方法是使用esutil.hdfs.ls(hdfs_url='', recurse=False, full=False),它在子进程中执行hadoop fs -ls hdfs_url,加上它具有许多其他Hadoop文件系统shell命令的函数(参见http://code.google.com/p/esutil/source/browse/trunk/esutil/hdfs.py的源代码)。esutil可以与pip install esutil一起安装。在PyPI https://pypi.python.org/pypi/esutil文档在http://code.google.com/p/esutil/和GitHub网站https://github.com/esheldon/esutil。

正如JGC所说,最简单的事情你可以做的是首先登陆(通过ssh)的一个节点(服务器参与一个Hadoop集群)和验证正确的访问控制和特权:

  • 使用HDFS客户端列出您的主目录,即hdfs dfs -ls
  • 列出HDFS中感兴趣的目录,即hdfs dfs -ls <absolute or relative path to HDFS directory>

然后,在Python中,您应该使用子进程和HDFS客户端来访问感兴趣的路径,并使用-C标志来排除不必要的元数据(以避免稍后进行丑陋的后处理)。

Popen(['hdfs', 'dfs', '-ls', '-C', dirname])

之后,将输出拆分为新行,然后您将得到路径列表。

下面是一个日志记录和错误处理的例子(包括当目录/文件不存在时):

from subprocess import Popen, PIPE
import logging
logger = logging.getLogger(__name__)
FAILED_TO_LIST_DIRECTORY_MSG = 'No such file or directory'
class HdfsException(Exception):
    pass
def hdfs_ls(dirname):
    """Returns list of HDFS directory entries."""
    logger.info('Listing HDFS directory ' + dirname)
    proc = Popen(['hdfs', 'dfs', '-ls', '-C', dirname], stdout=PIPE, stderr=PIPE)
    (out, err) = proc.communicate()
    if out:
        logger.debug('stdout:n' + out)
    if proc.returncode != 0:
        errmsg = 'Failed to list HDFS directory "' + dirname + '", return code ' + str(proc.returncode)
        logger.error(errmsg)
        logger.error(err)
        if not FAILED_TO_LIST_DIRECTORY_MSG in err:
            raise HdfsException(errmsg)
        return []
    elif err:
        logger.debug('stderr:n' + err)
    return out.splitlines()
# dat_files will contain a proper Python list of the paths to the '.dat' files you mentioned above.
dat_files = hdfs_ls('/hdfs-dir-with-dat-files/')

@JGC的回答帮了大忙。我想要一个更透明的函数版本,而不是一个更难读的一行代码;我还交换了字符串解析使用正则表达式,使其更透明,更不容易改变hdfs语法。这个版本看起来是这样的,与JGC相同的通用方法:

import re
import sh

def get_hdfs_files(directory:str) -> List[str]:
    '''
    Params: 
    directory: an HDFS directory e.g. /my/hdfs/location
    '''
    output = sh.hdfs('dfs','-ls',directory).split('n')
    files = []
    for line in output:
        match = re.search(f'({re.escape(directory)}.*$)', line)
        if match:
            files.append(match.group(0))
    return files

最新更新