Spark无法读取整行Hbase的数据,只能读取最后一个属性的值



为什么我无法在我的终端中获取完整的 Hbase 数据

host = 'localhost'
table = 'student'
conf = {"hbase.zookeeper.quorum": host, "hbase.mapreduce.inputtable": table}
keyConv = "org.apache.spark.examples.pythonconverters.ImmutableBytesWritableToStringConverter"
valueConv = "org.apache.spark.examples.pythonconverters.HBaseResultToStringConverter"
hbase_rdd = sc.newAPIHadoopRDD("org.apache.hadoop.hbase.mapreduce.TableInputFormat","org.apache.hadoop.hbase.io.ImmutableBytesWritable","org.apache.hadoop.hbase.client.Result",keyConverter=keyConv,valueConverter=valueConv,conf=conf)
hbase_rdd.collect()
[('1', '23'), ('2', '24'), ('3', '10')]

Hbase中的原始数据是这样的:

ROW                   COLUMN+CELL                                               
1                    column=info:age, timestamp=1525153512915, value=23        
1                    column=info:gender, timestamp=1525153501730, value=F      
1                    column=info:name, timestamp=1525153481472, value=lihuan   
2                    column=info:age, timestamp=1525153553378, value=24        
2                    column=info:gender, timestamp=1525153542869, value=F      
2                    column=info:name, timestamp=1525153531737, value=sunzhesi 
3                    column=info:age, timestamp=1525157971696, value=10        
3                    column=info:gender, timestamp=1525157958967, value=M      
3                    column=info:name, timestamp=1525157941132, value=axin

系统环境:Ubuntu16.04;蟒蛇3.5.2;火花 2.3.0;哈杜普2.9.0;硬件基础1.4.2

我实际上不确定当你像你一样使用 newAPIHadoopRDD 时会发生什么,但是当我尝试从 Hbase 扫描数据时,我添加了"hbase.mapreduce.scan"来确认。所以也许尝试添加这样的东西:

from py4j.java_gateway import java_import
from binascii import b2a_base64
jvm = sc._gateway.jvm
java_import(jvm, "org.apache.hadoop.hbase.client.Scan")
java_import(jvm, "org.apache.hadoop.hbase.util.Bytes")
java_import(jvm, "org.apache.hadoop.hbase.protobuf.ProtobufUtil")
to_bytes = lambda x: jvm.Bytes.toBytesBinary(x)
scan = jvm.Scan()
scan.setStartRow(to_bytes(YOUR_START_ROW))
scan.setStopRow(to_bytes(YOUR_STOP_ROW))
scan.addFamily(to_bytes(YOUR_COLUMN_FAMILY_KEY))
scan_proto_bytes = jvm.ProtobufUtil.toScan(scan).toByteArray()
scan_str = b2a_base64(str(scan_proto_bytes))
conf = {"hbase.mapreduce.inputtable" : table,
"hbase.mapreduce.scan" : scan_str,
"hbase.zookeeper.quorum" : host}

最新更新