使用Pyarrows的HdfsClient进行多处理



>我有一个顶级函数,它获取一个包含镶木地板文件路径和列名的元组。

该函数仅加载文件中的列,转换为熊猫,然后将其打包/序列化为标准形式。像这样:

import pyarrow as pa
import pyarrow.parquet as pq
from multiprocessing import Pool
def binarizer(file_data_tuple):
''' Read a Parquet column a file, binarize and return'''
path, col_name, col_meta, native = file_data_tuple
if not native: 
# Either this or using a top level hdfs_con
hdfs_con = pa.hdfs.connect(params)     
read_pq = pq.read_table if native else hdfs_con.read_parquet
arrow_col = read_pq(filepath, columns = (col_name,))
bin_col = imported_binarizng_function(arrow_col)
return bin_col
def read_binarize_parallel(filepaths):
''' Setup parallel reading and binarizing of a parquet file'''
# list of tuples containing the filepath, column name, meta, and mode   
pool_params = [(),..] 
pool = Pool()
for file in filepaths:
bin_cols = pool.map(binarizer, pool_params)
chunk =  b''.join(bin_cols)
send_over_socket(chunk)

当我使用纯模式(即从本地文件系统读取文件)时,这有效。

但是,如果我尝试读取hdfs,我会遇到奇怪的(对我来说)箭头错误,无论是在每个进程中打开连接时,还是当我尝试使用相同的连接时。以下是该错误的压缩版本:

[libprotobuf ERROR google/protobuf/message_lite.cc:123] 无法解析 类型为"Hdfs.Internal.RpcResponseHeaderProto"的消息,因为它是 缺少必填字段:callId、status [libprotobuf 错误 google/protobuf/message_lite.cc:123] 无法解析类型的消息 "Hdfs.Internal.RpcResponseHeaderProto",因为它缺少必需的 字段:callId,状态 [libprotobuf 错误 google/protobuf/message_lite.cc:123] 无法解析类型的消息 "Hdfs.Internal.RpcResponseHeaderProto",因为它缺少必需的 字段:callId,状态 [libprotobuf 错误 google/protobuf/message_lite.cc:123] 无法解析类型的消息 "Hdfs.Internal.RpcResponseHeaderProto",因为它缺少必需的 字段:callId,状态 2018-01-09 21:41:47.939006、p10007、 th139965275871040,错误 无法调用 RPC 调用"获取文件信息" 服务器"192.168.0.101:9000": RpcChannel.cpp: 703: HdfsRpc异常: "192.168.0.101:9000"的 RPC 通道协议不匹配:RPC 通道 找不到待处理呼叫:ID = 3。 @

未知
@   Unknown
@   arrow::io::HadoopFileSystem::GetPathInfo(std::string const&, arrow::io::HdfsPathInfo*)
@   __pyx_f_7pyarrow_3lib_16HadoopFileSystem__path_info(__pyx_obj_7pyarrow_3lib_HadoopFileSystem*,

_object*, arrow::io::HdfsPathInfo*) @ __pyx_pw_7pyarrow_3lib_16HadoopFileSystem_15isfile(_object*, _object*) @未知 @

未知
@   Unknown

2018-01-09 21:41:47.939103, p10007, th139965275871040, INFO 重试 幂等 RPC 调用"getFileInfo"在服务器"192.168.0.101:9000"上 2018-01-09 21:41:47.939357,p10010,th139965275871040,错误失败 在服务器"192.168.0.101:9000"上调用 RPC 调用"getFileInfo": RpcChannel.cpp: 780: hdfsRpcException: RPC channel to "192.168.0.101:9000"协议不匹配:RPC 通道无法解析 响应标头。 @

未知
@   Unknown
@   arrow::io::HadoopFileSystem::GetPathInfo(std::string const&, arrow::io::HdfsPathInfo*)
@   __pyx_f_7pyarrow_3lib_16HadoopFileSystem__path_info(__pyx_obj_7pyarrow_3lib_HadoopFileSystem*,

_object*, arrow::io::HdfsPathInfo*) @ __pyx_pw_7pyarrow_3lib_16HadoopFileSystem_13isdir(_object*, _object*) @未知 @

未知
@   Unknown
@2018-01-09 21:41:47.939406, p10008, th139965275871040, ERROR Failed to invoke RPC call "getFileInfo" on server

"192.168.0.101:9000":RpcChannel.cpp:780:HdfsRpc异常:RPC "192.168.0.101:9000"的通道协议不匹配:RPC 通道 无法分析响应标头。 @

未知
@   Unknown
@   arrow::io::HadoopFileSystem::GetPathInfo(std::string const&, arrow::io::HdfsPathInfo*)
@   __pyx_f_7pyarrow_3lib_16HadoopFileSystem__path_info(__pyx_obj_7pyarrow_3lib_HadoopFileSystem*,

_object*, arrow::io::HdfsPathInfo*) @ __pyx_pw_7pyarrow_3lib_16HadoopFileSystem_13isdir(_object*, _object*) @

未知
@   Unknown 2018-01-09 21:41:47.939422, p10013, th139965275871040, ERROR Failed to invoke RPC call "getFileInfo" on server

"192.168.0.101:9000":RpcChannel.cpp:780:HdfsRpc异常:RPC "192.168.0.101:9000"的通道协议不匹配:RPC 通道 无法分析响应标头。 @

未知
@   Unknown
@   arrow::io::HadoopFileSystem::GetPathInfo(std::string const&, arrow::io::HdfsPathInfo*)
@   __pyx_f_7pyarrow_3lib_16HadoopFileSystem__path_info(__pyx_obj_7pyarrow_3lib_HadoopFileSystem*,

_object*, arrow::io::HdfsPathInfo*) @ __pyx_pw_7pyarrow_3lib_16HadoopFileSystem_13isdir(_object*, _object*) @

未知
@   Unknown
@2018-01-09 21:41:47.939431, p10009, th139965275871040, ERROR Failed to invoke RPC call "getFileInfo" on server

"192.168.0.101:9000":RpcChannel.cpp:780:HdfsRpc异常:RPC "192.168.0.101:9000"的通道协议不匹配:RPC 通道 无法分析响应标头。 @

未知
@   Unknown
@   arrow::io::HadoopFileSystem::GetPathInfo(std::string const&, arrow::io::HdfsPathInfo*)
@   __pyx_f_7pyarrow_3lib_16HadoopFileSystem__path_info(__pyx_obj_7pyarrow_3lib_HadoopFileSystem*,

_object*, arrow::io::HdfsPathInfo*) @ __pyx_pw_7pyarrow_3lib_16HadoopFileSystem_13isdir(_object*, _object*) @

未知
@   Unknown
@   @   Unknown
Unknown 2018-01-09 21:41:47.939457, p10012, th139965275871040, ERROR Failed to invoke RPC call "getFileInfo" on server

"192.168.0.101:9000":RpcChannel.cpp:780:HdfsRpc异常:RPC "192.168.0.101:9000"的通道协议不匹配:RPC 通道 无法分析响应标头。 @

未知
@   Unknown
@   arrow::io::HadoopFileSystem::GetPathInfo(std::string const&, arrow::io::HdfsPathInfo*)
@   __pyx_f_7pyarrow_3lib_16HadoopFileSystem__path_info(__pyx_obj_7pyarrow_3lib_HadoopFileSystem*,

_object*, arrow::io::HdfsPathInfo*) @ __pyx_pw_7pyarrow_3lib_16HadoopFileSystem_13isdir(_object*, _object*) @未知 @

未知
@   Unknown
@   Unknown
Unknown
@   Unknown binarizing process filepath: /parquet_430mb/5e6.parquet
@   Unknown
Unknown
@   Unknown
@   Unknown

@   Unknown

2018-01-09 21:41:47.939854, p10010, th139965275871040, INFO 重试 幂等 RPC 调用"getFileInfo"在服务器"192.168.0.101:9000"上

2018-01-09 21:41:47.939864, p10013, th139965275871040, INFO 重试 幂等 RPC 调用"getFileInfo"在服务器"192.168.0.101:9000"上 2018-01-09 21:41:47.939866, p10008, th139965275871040, INFO 重试 幂等 RPC 调用"getFileInfo"在服务器"192.168.0.101:9000"上 2018-01-09 21:41:47.939868, p10012, th139965275871040, INFO 重试 幂等 RPC 调用"getFileInfo"在服务器"192.168.0.101:9000"上 2018-01-09 21:41:47.939868, p10009, th139965275871040, INFO 重试 幂等 RPC 调用"getFileInfo"在服务器"192.168.0.101:9000"上 2018-01-09 21:41:47.940813,p10014,th139965275871040,错误失败 在服务器"192.168.0.101:9000"上调用 RPC 调用"getFileInfo": RpcChannel.cpp: 780: hdfsRpcException: RPC channel to "192.168.0.101:9000"协议不匹配:RPC 通道无法解析 响应标头。 @

未知
@   Unknown
@   arrow::io::HadoopFileSystem::GetPathInfo(std::string const&, arrow::io::HdfsPathInfo*)
@   __pyx_f_7pyarrow_3lib_16HadoopFileSystem__path_info(__pyx_obj_7pyarrow_3lib_HadoopFileSystem*,

_object*, arrow::io::HdfsPathInfo*) @ __pyx_pw_7pyarrow_3lib_16HadoopFileSystem_13isdir(_object*, _object*) @

未知
@   Unknown

2018-01-09 21:41:47.940937, p10014, th139965275871040, INFO 重试幂等 RPC 调用"getFileInfo"在服务器"192.168.0.101:9000"上 2018-01-09 21:41:47.944352,p10011,th139965275871040,错误失败 在服务器"192.168.0.101:9000"上调用 RPC 调用"getFileInfo": RpcChannel.cpp: 393: hdfsRpcException: 无法调用 RPC 调用 服务器"192.168.0.101:9000"上的"获取文件信息" @未知 @

未知
@   Unknown
@   arrow::io::HadoopFileSystem::GetPathInfo(std::string const&, arrow::io::HdfsPathInfo*)
@   __pyx_f_7pyarrow_3lib_16HadoopFileSystem__path_info(__pyx_obj_7pyarrow_3lib_HadoopFileSystem*,

_object*, arrow::io::HdfsPathInfo*) @ __pyx_pw_7pyarrow_3lib_16HadoopFileSystem_13isdir(_object*, _object*) @

未知
@   Unknown Caused by TcpSocket.cpp: 127: HdfsNetworkException: Write 124 bytes failed to "192.168.0.101:9000": (errno: 32) Broken

管 @未知 @

未知
@   Unknown
@   arrow::io::HadoopFileSystem::GetPathInfo(std::string const&, arrow::io::HdfsPathInfo*)
@   __pyx_f_7pyarrow_3lib_16HadoopFileSystem__path_info(__pyx_obj_7pyarrow_3lib_HadoopFileSystem*,

_object*, arrow::io::HdfsPathInfo*) @ __pyx_pw_7pyarrow_3lib_16HadoopFileSystem_13isdir(_object*, _object*) @未知 @

未知
@   Unknown

2018-01-09 21:41:47.944519, p10011, th139965275871040, INFO 重试 幂等 RPC 调用"getFileInfo"在服务器"192.168.0.101:9000"上 --------------------------------------------------------------------------- ArrowIOError Traceback(最近一次调用) 最后)

/home/parquet_sender.pyc in insert_files_parallel(self) 374 # 打印 ('372 sqparquet 文件路径:', 文件路径) 375 params_with_path_and_mode = [col_params+(文件路径,本机)表示pool_params中的col_params] --> 376 bin_col = self.pool.map(read_binarize, params_with_path_and_mode) 377 得到("地图完成") 378 num_rows = bin_col[0][2]

/usr/lib/python2.7/multiprocessing/pool.pyc in map(self, func, 可迭代,块大小) 249 ''' 250 断言self._state == 运行 --> 251 返回self.map_async(func, iterable, chunksize).get() 252 253 def IMAP(self, func, iterable, chunksize=1):

/usr/lib/python2.7/multiprocessing/pool.pyc in get(self, timeout) 556 返回self._value 557 其他: --> 558 提高self._value 559 560 def _set(self, i, obj):

ArrowIOError: HDFS: GetPathInfo 失败

我很高兴收到有关此错误原因的任何反馈,以及我应该如何使用并行镶木地板加载。

这是一个与多进程序列化详细信息相关的 bug。我在这里打开了一个错误报告 https://issues.apache.org/jira/browse/ARROW-1986

相关内容

  • 没有找到相关文章

最新更新