我希望使用Spark流将protobuf消息存储在Hbase/HDFS中。我有以下两个问题
- 存储大量原生生物的有效方法是什么 消息以及检索它们以执行某些操作的有效方法 分析学?例如,它们应该存储在 Hbase 中的字符串/字节[]中,还是应该存储在 HDFS 中的镶木地板文件中等。
- 原型的层次结构应该如何 消息是否存储?我的意思是,嵌套元素是否应该展平 在存储之前取出,还是有任何机制可以按原样存储它们? 如果嵌套元素是集合或映射,则它们应该是 分解并存储为多行?
Protobuf 消息的示例结构如下所示
> +--MsgNode-1
> +--Attribute1 - String
> +--Attribute2 - Int
> +--MsgNode-2
> +--Attribute1 - String
> +--Attribute2 - Double
> +--MsgNode-3 - List of MsgNode-3's
> +--Attribute1 - Int
我计划使用Spark流将protobuf消息收集为字节并将它们存储在Hbase/HDFS中。
问题 1 :
存储大量 protobuf 消息的有效方法是什么 以及检索它们以进行某些分析的有效方法?为 例如,它们应该在 Hbase 中存储为 Strings/byte[] 还是应该 它们存储在 HDFS 等的镶木地板文件中。
我会推荐 - 将原型存储为镶木地板AVRO文件(使用AVRO模式拆分为有意义的消息)。
这可以使用数据帧 api spark 1.5 及更高版本(PartiotionBy
SaveMode.Append
)来实现
查看这个强大的大数据三重奏
如果您存储为字符串或字节数组,则无法直接进行数据分析(对原始数据进行查询)。
如果您使用的是 cloudera,impala(支持 parquet-avro)可用于查询原始数据。
问题2:
原型消息的层次结构应该如何 数据处理?我的意思是,嵌套元素是否应该在之前展平 存储,或者是否有任何机制可以按原样存储它们?如果嵌套 元素是集合或贴图,如果它们被分解并存储为 多行?
如果您以来自 Spark 流的原始格式存储数据,您将如何查询业务是否想要查询并知道他们收到了什么样的数据(此要求非常常见)。
首先,您必须了解您的数据(即不同消息之间的关系与 protobuf 中,以便您可以决定单行或多行),然后开发 protobuf 解析器来解析 protobuf 的消息结构。 根据您的数据,将其转换为 avro 通用记录以另存为镶木地板文件。
提示:
Protobuf 解析器可以根据您的要求以不同的方式开发。 其中一种通用方式如下例所示。
public SortedMap<String, Object> convertProtoMessageToMap(GeneratedMessage src) {
final SortedMap<String, Object> finalMap = new TreeMap<String, Object>();
final Map<FieldDescriptor, Object> fields = src.getAllFields();
for (final Map.Entry<FieldDescriptor, Object> fieldPair : fields.entrySet()) {
final FieldDescriptor desc = fieldPair.getKey();
if (desc.isRepeated()) {
final List<?> fieldList = (List<?>) fieldPair.getValue();
if (fieldList.size() != 0) {
final List<String> arrayListOfElements = new ArrayList<String>();
for (final Object o : fieldList) {
arrayListOfElements.add(o.toString());
}
finalMap.put(desc.getName(), arrayListOfElements);
}
} else {
finalMap.put(desc.getName(), fieldPair.getValue().toString());
}
}
return finalMap;
}