>我有一个顶级函数,它获取一个包含镶木地板文件路径和列名的元组。
该函数仅加载文件中的列,转换为熊猫,然后将其打包/序列化为标准形式。像这样:
import pyarrow as pa
import pyarrow.parquet as pq
from multiprocessing import Pool
def binarizer(file_data_tuple):
''' Read a Parquet column a file, binarize and return'''
path, col_name, col_meta, native = file_data_tuple
if not native:
# Either this or using a top level hdfs_con
hdfs_con = pa.hdfs.connect(params)
read_pq = pq.read_table if native else hdfs_con.read_parquet
arrow_col = read_pq(filepath, columns = (col_name,))
bin_col = imported_binarizng_function(arrow_col)
return bin_col
def read_binarize_parallel(filepaths):
''' Setup parallel reading and binarizing of a parquet file'''
# list of tuples containing the filepath, column name, meta, and mode
pool_params = [(),..]
pool = Pool()
for file in filepaths:
bin_cols = pool.map(binarizer, pool_params)
chunk = b''.join(bin_cols)
send_over_socket(chunk)
当我使用纯模式(即从本地文件系统读取文件)时,这有效。
但是,如果我尝试读取hdfs,我会遇到奇怪的(对我来说)箭头错误,无论是在每个进程中打开连接时,还是当我尝试使用相同的连接时。以下是该错误的压缩版本:
[libprotobuf ERROR google/protobuf/message_lite.cc:123] 无法解析 类型为"Hdfs.Internal.RpcResponseHeaderProto"的消息,因为它是 缺少必填字段:callId、status [libprotobuf 错误 google/protobuf/message_lite.cc:123] 无法解析类型的消息 "Hdfs.Internal.RpcResponseHeaderProto",因为它缺少必需的 字段:callId,状态 [libprotobuf 错误 google/protobuf/message_lite.cc:123] 无法解析类型的消息 "Hdfs.Internal.RpcResponseHeaderProto",因为它缺少必需的 字段:callId,状态 [libprotobuf 错误 google/protobuf/message_lite.cc:123] 无法解析类型的消息 "Hdfs.Internal.RpcResponseHeaderProto",因为它缺少必需的 字段:callId,状态 2018-01-09 21:41:47.939006、p10007、 th139965275871040,错误 无法调用 RPC 调用"获取文件信息" 服务器"192.168.0.101:9000": RpcChannel.cpp: 703: HdfsRpc异常: "192.168.0.101:9000"的 RPC 通道协议不匹配:RPC 通道 找不到待处理呼叫:ID = 3。 @
未知@ Unknown @ arrow::io::HadoopFileSystem::GetPathInfo(std::string const&, arrow::io::HdfsPathInfo*) @ __pyx_f_7pyarrow_3lib_16HadoopFileSystem__path_info(__pyx_obj_7pyarrow_3lib_HadoopFileSystem*,
_object*, arrow::io::HdfsPathInfo*) @ __pyx_pw_7pyarrow_3lib_16HadoopFileSystem_15isfile(_object*, _object*) @未知 @
未知@ Unknown
2018-01-09 21:41:47.939103, p10007, th139965275871040, INFO 重试 幂等 RPC 调用"getFileInfo"在服务器"192.168.0.101:9000"上 2018-01-09 21:41:47.939357,p10010,th139965275871040,错误失败 在服务器"192.168.0.101:9000"上调用 RPC 调用"getFileInfo": RpcChannel.cpp: 780: hdfsRpcException: RPC channel to "192.168.0.101:9000"协议不匹配:RPC 通道无法解析 响应标头。 @
未知@ Unknown @ arrow::io::HadoopFileSystem::GetPathInfo(std::string const&, arrow::io::HdfsPathInfo*) @ __pyx_f_7pyarrow_3lib_16HadoopFileSystem__path_info(__pyx_obj_7pyarrow_3lib_HadoopFileSystem*,
_object*, arrow::io::HdfsPathInfo*) @ __pyx_pw_7pyarrow_3lib_16HadoopFileSystem_13isdir(_object*, _object*) @未知 @
未知@ Unknown @2018-01-09 21:41:47.939406, p10008, th139965275871040, ERROR Failed to invoke RPC call "getFileInfo" on server
"192.168.0.101:9000":RpcChannel.cpp:780:HdfsRpc异常:RPC "192.168.0.101:9000"的通道协议不匹配:RPC 通道 无法分析响应标头。 @
未知@ Unknown @ arrow::io::HadoopFileSystem::GetPathInfo(std::string const&, arrow::io::HdfsPathInfo*) @ __pyx_f_7pyarrow_3lib_16HadoopFileSystem__path_info(__pyx_obj_7pyarrow_3lib_HadoopFileSystem*,
_object*, arrow::io::HdfsPathInfo*) @ __pyx_pw_7pyarrow_3lib_16HadoopFileSystem_13isdir(_object*, _object*) @
未知@ Unknown 2018-01-09 21:41:47.939422, p10013, th139965275871040, ERROR Failed to invoke RPC call "getFileInfo" on server
"192.168.0.101:9000":RpcChannel.cpp:780:HdfsRpc异常:RPC "192.168.0.101:9000"的通道协议不匹配:RPC 通道 无法分析响应标头。 @
未知@ Unknown @ arrow::io::HadoopFileSystem::GetPathInfo(std::string const&, arrow::io::HdfsPathInfo*) @ __pyx_f_7pyarrow_3lib_16HadoopFileSystem__path_info(__pyx_obj_7pyarrow_3lib_HadoopFileSystem*,
_object*, arrow::io::HdfsPathInfo*) @ __pyx_pw_7pyarrow_3lib_16HadoopFileSystem_13isdir(_object*, _object*) @
未知@ Unknown @2018-01-09 21:41:47.939431, p10009, th139965275871040, ERROR Failed to invoke RPC call "getFileInfo" on server
"192.168.0.101:9000":RpcChannel.cpp:780:HdfsRpc异常:RPC "192.168.0.101:9000"的通道协议不匹配:RPC 通道 无法分析响应标头。 @
未知@ Unknown @ arrow::io::HadoopFileSystem::GetPathInfo(std::string const&, arrow::io::HdfsPathInfo*) @ __pyx_f_7pyarrow_3lib_16HadoopFileSystem__path_info(__pyx_obj_7pyarrow_3lib_HadoopFileSystem*,
_object*, arrow::io::HdfsPathInfo*) @ __pyx_pw_7pyarrow_3lib_16HadoopFileSystem_13isdir(_object*, _object*) @
未知@ Unknown @ @ Unknown Unknown 2018-01-09 21:41:47.939457, p10012, th139965275871040, ERROR Failed to invoke RPC call "getFileInfo" on server
"192.168.0.101:9000":RpcChannel.cpp:780:HdfsRpc异常:RPC "192.168.0.101:9000"的通道协议不匹配:RPC 通道 无法分析响应标头。 @
未知@ Unknown @ arrow::io::HadoopFileSystem::GetPathInfo(std::string const&, arrow::io::HdfsPathInfo*) @ __pyx_f_7pyarrow_3lib_16HadoopFileSystem__path_info(__pyx_obj_7pyarrow_3lib_HadoopFileSystem*,
_object*, arrow::io::HdfsPathInfo*) @ __pyx_pw_7pyarrow_3lib_16HadoopFileSystem_13isdir(_object*, _object*) @未知 @
未知@ Unknown @ Unknown Unknown @ Unknown binarizing process filepath: /parquet_430mb/5e6.parquet @ Unknown Unknown @ Unknown @ Unknown @ Unknown
2018-01-09 21:41:47.939854, p10010, th139965275871040, INFO 重试 幂等 RPC 调用"getFileInfo"在服务器"192.168.0.101:9000"上
2018-01-09 21:41:47.939864, p10013, th139965275871040, INFO 重试 幂等 RPC 调用"getFileInfo"在服务器"192.168.0.101:9000"上 2018-01-09 21:41:47.939866, p10008, th139965275871040, INFO 重试 幂等 RPC 调用"getFileInfo"在服务器"192.168.0.101:9000"上 2018-01-09 21:41:47.939868, p10012, th139965275871040, INFO 重试 幂等 RPC 调用"getFileInfo"在服务器"192.168.0.101:9000"上 2018-01-09 21:41:47.939868, p10009, th139965275871040, INFO 重试 幂等 RPC 调用"getFileInfo"在服务器"192.168.0.101:9000"上 2018-01-09 21:41:47.940813,p10014,th139965275871040,错误失败 在服务器"192.168.0.101:9000"上调用 RPC 调用"getFileInfo": RpcChannel.cpp: 780: hdfsRpcException: RPC channel to "192.168.0.101:9000"协议不匹配:RPC 通道无法解析 响应标头。 @
未知@ Unknown @ arrow::io::HadoopFileSystem::GetPathInfo(std::string const&, arrow::io::HdfsPathInfo*) @ __pyx_f_7pyarrow_3lib_16HadoopFileSystem__path_info(__pyx_obj_7pyarrow_3lib_HadoopFileSystem*,
_object*, arrow::io::HdfsPathInfo*) @ __pyx_pw_7pyarrow_3lib_16HadoopFileSystem_13isdir(_object*, _object*) @
未知@ Unknown
2018-01-09 21:41:47.940937, p10014, th139965275871040, INFO 重试幂等 RPC 调用"getFileInfo"在服务器"192.168.0.101:9000"上 2018-01-09 21:41:47.944352,p10011,th139965275871040,错误失败 在服务器"192.168.0.101:9000"上调用 RPC 调用"getFileInfo": RpcChannel.cpp: 393: hdfsRpcException: 无法调用 RPC 调用 服务器"192.168.0.101:9000"上的"获取文件信息" @未知 @
未知@ Unknown @ arrow::io::HadoopFileSystem::GetPathInfo(std::string const&, arrow::io::HdfsPathInfo*) @ __pyx_f_7pyarrow_3lib_16HadoopFileSystem__path_info(__pyx_obj_7pyarrow_3lib_HadoopFileSystem*,
_object*, arrow::io::HdfsPathInfo*) @ __pyx_pw_7pyarrow_3lib_16HadoopFileSystem_13isdir(_object*, _object*) @
未知@ Unknown Caused by TcpSocket.cpp: 127: HdfsNetworkException: Write 124 bytes failed to "192.168.0.101:9000": (errno: 32) Broken
管 @未知 @
未知@ Unknown @ arrow::io::HadoopFileSystem::GetPathInfo(std::string const&, arrow::io::HdfsPathInfo*) @ __pyx_f_7pyarrow_3lib_16HadoopFileSystem__path_info(__pyx_obj_7pyarrow_3lib_HadoopFileSystem*,
_object*, arrow::io::HdfsPathInfo*) @ __pyx_pw_7pyarrow_3lib_16HadoopFileSystem_13isdir(_object*, _object*) @未知 @
未知@ Unknown
2018-01-09 21:41:47.944519, p10011, th139965275871040, INFO 重试 幂等 RPC 调用"getFileInfo"在服务器"192.168.0.101:9000"上 --------------------------------------------------------------------------- ArrowIOError Traceback(最近一次调用) 最后)
/home/parquet_sender.pyc in insert_files_parallel(self) 374 # 打印 ('372 sqparquet 文件路径:', 文件路径) 375 params_with_path_and_mode = [col_params+(文件路径,本机)表示pool_params中的col_params] --> 376 bin_col = self.pool.map(read_binarize, params_with_path_and_mode) 377 得到("地图完成") 378 num_rows = bin_col[0][2]
/usr/lib/python2.7/multiprocessing/pool.pyc in map(self, func, 可迭代,块大小) 249 ''' 250 断言self._state == 运行 --> 251 返回self.map_async(func, iterable, chunksize).get() 252 253 def IMAP(self, func, iterable, chunksize=1):
/usr/lib/python2.7/multiprocessing/pool.pyc in get(self, timeout) 556 返回self._value 557 其他: --> 558 提高self._value 559 560 def _set(self, i, obj):
ArrowIOError: HDFS: GetPathInfo 失败
我很高兴收到有关此错误原因的任何反馈,以及我应该如何使用并行镶木地板加载。
这是一个与多进程序列化详细信息相关的 bug。我在这里打开了一个错误报告 https://issues.apache.org/jira/browse/ARROW-1986