我正在尝试基于HDFS中的avro文件创建hive/impala表。执行转换的工具是Spark。
我不能使用spark.read.format("avro")
将数据加载到数据帧中,因为那样doc
部分(列的描述(就会丢失。我可以通过以下操作查看文档:
input = sc.textFile("/path/to/avrofile")
avro_schema = input.first() # not sure what type it is
问题是,它是一个嵌套的模式,我不知道如何遍历它来将doc
映射到数据帧中的列描述。我想把doc
添加到表的列描述中。例如,输入模式看起来像:
"fields": [
{
"name":"productName",
"type": [
"null",
"string"
],
"doc": "Real name of the product"
"default": null
},
{
"name" : "currentSellers",
"type": [
"null",
{
"type": "record",
"name": "sellers",
"fields":[
{
"name": "location",
"type":[
"null",
{
"type": "record"
"name": "sellerlocation",
"fields": [
{
"name":"locationName",
"type": [
"null",
"string"
],
"doc": "Name of the location",
"default":null
},
{
"name":"locationArea",
"type": [
"null",
"string"
],
"doc": "Area of the location",#The comment needs to be added to table comments
"default":null
.... #These are nested fields
在最后的表中,例如一个字段名称将是currentSellers_locationName
,列描述为"0";地点名称";。有人能帮忙解释一下如何解析模式并将文档添加到描述中吗?并解释一下下面的部分是关于领域之外的什么?非常感谢。如果我能更好地解释,请告诉我。
"name" : "currentSellers",
"type": [
"null",
{
"type": "record",
"name": "sellers",
"fields":[
{
如果您想自己解析模式并手动向spark添加元数据,我建议使用flatdict
包:
from flatdict import FlatterDict
flat_schema = FlatterDict(schema) # schema as python dict
names = {k.replace(':name', ''): flat_schema[k] for k in flat_schema if k.endswith(':name')}
docs = {k.replace(':doc', ''): flat_schema[k] for k in flat_schema if k.endswith(':doc')}
# keep only keys which are present in both names and docs
keys_with_doc = set(names.keys()) & set(docs.keys())
full_name = lambda key: '_'.join(
names[k] for k in sorted(names, key=len) if key.startswith(k) and k.split(':')[-2] == 'fields'
)
name_doc_map = {full_name(k): docs[k] for k in keys_with_doc}
flat_schema.keys()
中的一组典型密钥是:
'fields:1:type:1:fields:0:type:1:fields:0:type:1',
'fields:1:type:1:fields:0:type:1:fields:0:name',
'fields:1:type:1:fields:0:type:1:fields:0:default',
'fields:1:type:1:fields:0:type:1:fields:0:doc',
现在可以操作这些字符串:
- 只提取以";name";以及";doc";(忽略"默认"等(
- get-set-intersection以删除不同时存在两个字段的字段
- 从更高层次获得所有字段名称的列表:
fields:1:type:1:fields
是fields:1:type:1:fields:0:type:1:fields
的父级之一(条件是它们有相同的开始,并且以"字段"结束(
查看:https://github.com/apache/spark/blob/master/connector/avro/src/main/scala/org/apache/spark/sql/avro/SchemaConverters.scala#L58
Spark对模式转换器的实现似乎没有调用org.apache.avro.schema对象中的getDoc函数,也没有调用它通过迭代的org.apache.afro.schema.Field对象中的doc函数
它只读取类型元数据,并尝试将其转换为Spark的内部类型对象(StructType、StringType、LongType等(
我能想到的只有两个解决方案:
- 自己读取一个或一些文件并解析模式
- 将模式保存在一个单独的文件中,该文件应该足够容易解析,因为它是纯JSON
来自@bzu 的代码
from flatdict import FlatterDict
flat_schema = FlatterDict(schema) # schema as python dict
names = {k.replace(':name', ''): flat_schema[k] for k in flat_schema if k.endswith(':name')}
docs = {k.replace(':doc', ''): flat_schema[k] for k in flat_schema if k.endswith(':doc')}
keys_with_doc = set(names.keys()) & set(docs.keys())
full_name = lambda key: '_'.join(
names[k] for k in sorted(names, key=len) if key.startswith(k) and k.split(':')[-2] == 'fields'
)
name_doc_map = {full_name(k): docs[k] for k in keys_with_doc}
解释
FlatterDict
例如:
value = flatdict.FlatterDict({'list': ['a', 'b', 'c']})
将与相同
value == {'list:0': 'a', 'list:1': 'b', 'list:2': 'c'}
所以基本上你的嵌套列表基本上是这样的:
{"fields0": ..., "fields1": ...}
有关FlatterDict
的更多信息,请点击此处
set(names.keys()) & set(docs.keys())
names.keys()
和docs.keys
从字典中获取所有键,目前有多个值,因此我们必须将它们放入一个集合中,以分组为一个变量。然后,该行使用&
对这两个集合执行逐位运算符,基本上找到两个集合中的所有重复项,并将它们放入keys_with_doc
中。例如
a={1:"a", 3:"c", 4:"d"}
b={1:"a", 2:"b"}
name=set(a.keys()) & set(b.keys())
name
>>> {1}
钥匙在左边:
dict = {1:"a",2:"b"}
full_name
这个lambda函数很棘手。它加入了一堆"_&";。这个";"一堆东西";是names
的字典,循环将遍历该字典,并使用key=len
和if语句从已排序的names
字典中提取names[k]
。基本上,通过在name_doc_map
中使用这个lambda,您将创建一个新的字典,该字典使用原始集合keys_with_doc
和docs
中的条件。