Hadoop新手。
我已经搜索了一些关于如何开始使用hadoop和python的教程,但没有多少成功。我还不需要对映射器和reducer做任何工作,但这更多的是一个访问问题。
作为Hadoop集群的一部分,HDFS上有一堆。dat文件。
为了在我的客户端(本地计算机)上使用Python访问这些文件,
我需要在我的电脑上安装什么?
如何查询HDFS上的文件名
任何链接也会有帮助。
据我所知,没有开箱即用的解决方案,我发现的大多数答案都诉诸于调用hdfs
命令。我在Linux上运行,遇到了同样的挑战。我发现sh
包很有用。这将为您处理运行o/s命令和管理stdin/out/err。
更多信息请看这里:https://amoffat.github.io/sh/
不是最简洁的解决方案,但它只有一行,并且使用了标准的包。
下面是获取HDFS目录清单的简化代码。它将列出类似的文件和文件夹,因此如果您需要区分它们,可能需要修改。
import sh
hdfsdir = '/somedirectory'
filelist = [ line.rsplit(None,1)[-1] for line in sh.hdfs('dfs','-ls',hdfsdir).split('n') if len(line.rsplit(None,1))][1:]
我的输出-在这种情况下,这些都是目录:
[u'/somedirectory/transaction_basket_fct/date_id=2015-01-01',
u'/somedirectory/transaction_basket_fct/date_id=2015-01-02',
u'/somedirectory/transaction_basket_fct/date_id=2015-01-03',
u'/somedirectory/transaction_basket_fct/date_id=2015-01-04',
u'/somedirectory/transaction_basket_fct/date_id=2015-01-05',
u'/somedirectory/transaction_basket_fct/date_id=2015-01-06',
u'/somedirectory/transaction_basket_fct/date_id=2015-01-07',
u'/somedirectory/transaction_basket_fct/date_id=2015-01-08']
让我们来分析一下:
要运行hdfs dfs -ls /somedirectory
命令,我们可以像这样使用sh
包:
import sh
sh.hdfs('dfs','-ls',hdfsdir)
sh
允许您无缝地调用o/s命令,就好像它们是模块上的函数一样。将命令参数作为函数参数传递。很整洁。
对我来说,这返回类似于:
Found 366 items
drwxrwx---+ - impala hive 0 2016-05-10 13:52 /somedirectory/transaction_basket_fct/date_id=2015-01-01
drwxrwx---+ - impala hive 0 2016-05-10 13:52 /somedirectory/transaction_basket_fct/date_id=2015-01-02
drwxrwx---+ - impala hive 0 2016-05-10 13:52 /somedirectory/transaction_basket_fct/date_id=2015-01-03
drwxrwx---+ - impala hive 0 2016-05-10 13:52 /somedirectory/transaction_basket_fct/date_id=2015-01-04
drwxrwx---+ - impala hive 0 2016-05-10 13:52 /somedirectory/transaction_basket_fct/date_id=2015-01-05
使用.split('n')
使用line.rsplit(None,1)[-1]
获取字符串中的最后一个'word'
使用if len(line.rsplit(None,1))
最后删除列表中的第一个元素(Found 366 items
)使用[1:]
对于"查询HDFS上的文件名"使用python 3的原始子进程库:
from subprocess import Popen, PIPE
hdfs_path = '/path/to/the/designated/folder'
process = Popen(f'hdfs dfs -ls -h {hdfs_path}', shell=True, stdout=PIPE, stderr=PIPE)
std_out, std_err = process.communicate()
list_of_file_names = [fn.split(' ')[-1].split('/')[-1] for fn in std_out.decode().split('n')[1:]][:-1]
list_of_file_names_with_full_address = [fn.split(' ')[-1] for fn in std_out.decode().split('n')[1:]][:-1]
我需要在我的电脑上安装什么?
你需要安装并运行Hadoop,当然还有Python。
如何查询HDFS上的文件名
你可以在这里试一下。我还没有测试过代码,所以不要依赖它。
from subprocess import Popen, PIPE
process = Popen('hdfs dfs -cat filename.dat',shell=True,stdout=PIPE, stderr=PIPE)
std_out, std_err = process.communicate()
check for returncode, std_err
if:
everything is OK, do whatever with stdout
else:
do something in else condition
你也可以看看py,这是一个Hadoop的Python API。
虽然我的例子包括shell=true
,你可以尝试不运行它,因为它是一个安全风险。为什么不应该使用shell=True
?
您应该可以登录到集群中的一个节点。让集群管理员选择节点并设置帐户,并告诉您如何安全地访问节点。如果您是管理员,请让我知道群集是本地的还是远程的,如果是远程的,那么它是托管在您的计算机上,在公司内部还是在第三方云上,如果是,谁,然后我可以提供更多相关信息。
要查询HDFS中的文件名,请登录集群节点,执行hadoop fs -ls [path]
。路径是可选的,如果没有提供,则列出主目录中的文件。如果-R
作为一个选项提供,那么它将递归地列出path中的所有文件。该命令还有其他选项。有关此命令和其他Hadoop文件系统shell命令的更多信息,请参见http://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-common/FileSystemShell.html.
在Python中查询HDFS文件名的一种简单方法是使用esutil.hdfs.ls(hdfs_url='', recurse=False, full=False)
,它在子进程中执行hadoop fs -ls hdfs_url
,加上它具有许多其他Hadoop文件系统shell命令的函数(参见http://code.google.com/p/esutil/source/browse/trunk/esutil/hdfs.py的源代码)。esutil可以与pip install esutil
一起安装。在PyPI https://pypi.python.org/pypi/esutil文档在http://code.google.com/p/esutil/和GitHub网站https://github.com/esheldon/esutil。
正如JGC所说,最简单的事情你可以做的是首先登陆(通过ssh
)的一个节点(服务器参与一个Hadoop集群)和验证正确的访问控制和特权:
- 使用HDFS客户端列出您的主目录,即
hdfs dfs -ls
- 列出HDFS中感兴趣的目录,即
hdfs dfs -ls <absolute or relative path to HDFS directory>
然后,在Python中,您应该使用子进程和HDFS客户端来访问感兴趣的路径,并使用-C
标志来排除不必要的元数据(以避免稍后进行丑陋的后处理)。
。Popen(['hdfs', 'dfs', '-ls', '-C', dirname])
之后,将输出拆分为新行,然后您将得到路径列表。
下面是一个日志记录和错误处理的例子(包括当目录/文件不存在时):
from subprocess import Popen, PIPE
import logging
logger = logging.getLogger(__name__)
FAILED_TO_LIST_DIRECTORY_MSG = 'No such file or directory'
class HdfsException(Exception):
pass
def hdfs_ls(dirname):
"""Returns list of HDFS directory entries."""
logger.info('Listing HDFS directory ' + dirname)
proc = Popen(['hdfs', 'dfs', '-ls', '-C', dirname], stdout=PIPE, stderr=PIPE)
(out, err) = proc.communicate()
if out:
logger.debug('stdout:n' + out)
if proc.returncode != 0:
errmsg = 'Failed to list HDFS directory "' + dirname + '", return code ' + str(proc.returncode)
logger.error(errmsg)
logger.error(err)
if not FAILED_TO_LIST_DIRECTORY_MSG in err:
raise HdfsException(errmsg)
return []
elif err:
logger.debug('stderr:n' + err)
return out.splitlines()
# dat_files will contain a proper Python list of the paths to the '.dat' files you mentioned above.
dat_files = hdfs_ls('/hdfs-dir-with-dat-files/')
@JGC的回答帮了大忙。我想要一个更透明的函数版本,而不是一个更难读的一行代码;我还交换了字符串解析使用正则表达式,使其更透明,更不容易改变hdfs语法。这个版本看起来是这样的,与JGC相同的通用方法:
import re
import sh
def get_hdfs_files(directory:str) -> List[str]:
'''
Params:
directory: an HDFS directory e.g. /my/hdfs/location
'''
output = sh.hdfs('dfs','-ls',directory).split('n')
files = []
for line in output:
match = re.search(f'({re.escape(directory)}.*$)', line)
if match:
files.append(match.group(0))
return files