如何从avro数据中提取文档并将其添加到数据帧中



我正在尝试基于HDFS中的avro文件创建hive/impala表。执行转换的工具是Spark。

我不能使用spark.read.format("avro")将数据加载到数据帧中,因为那样doc部分(列的描述(就会丢失。我可以通过以下操作查看文档:

input = sc.textFile("/path/to/avrofile")
avro_schema = input.first() # not sure what type it is 

问题是,它是一个嵌套的模式,我不知道如何遍历它来将doc映射到数据帧中的列描述。我想把doc添加到表的列描述中。例如,输入模式看起来像:

"fields": [
{
"name":"productName",
"type": [
"null",
"string"
],
"doc": "Real name of the product"
"default": null
},
{
"name" : "currentSellers",
"type": [
"null",
{
"type": "record",
"name": "sellers",
"fields":[
{
"name": "location",
"type":[
"null",
{
"type": "record"
"name": "sellerlocation",
"fields": [
{
"name":"locationName",
"type": [
"null",
"string"
],
"doc": "Name of the location",
"default":null
},
{
"name":"locationArea",
"type": [
"null",
"string"
],
"doc": "Area of the location",#The comment needs to be added to table comments
"default":null
.... #These are nested fields 

在最后的表中,例如一个字段名称将是currentSellers_locationName,列描述为"0";地点名称";。有人能帮忙解释一下如何解析模式并将文档添加到描述中吗?并解释一下下面的部分是关于领域之外的什么?非常感谢。如果我能更好地解释,请告诉我。

"name" : "currentSellers",
"type": [
"null",
{
"type": "record",
"name": "sellers",
"fields":[
{

如果您想自己解析模式并手动向spark添加元数据,我建议使用flatdict包:

from flatdict import FlatterDict
flat_schema = FlatterDict(schema)  # schema as python dict
names = {k.replace(':name', ''): flat_schema[k] for k in flat_schema if k.endswith(':name')}
docs = {k.replace(':doc', ''): flat_schema[k] for k in flat_schema if k.endswith(':doc')}
# keep only keys which are present in both names and docs
keys_with_doc = set(names.keys()) & set(docs.keys())
full_name = lambda key: '_'.join(
names[k] for k in sorted(names, key=len) if key.startswith(k) and k.split(':')[-2] == 'fields'
)
name_doc_map = {full_name(k): docs[k] for k in keys_with_doc}

flat_schema.keys()中的一组典型密钥是:

'fields:1:type:1:fields:0:type:1:fields:0:type:1',
'fields:1:type:1:fields:0:type:1:fields:0:name',
'fields:1:type:1:fields:0:type:1:fields:0:default',
'fields:1:type:1:fields:0:type:1:fields:0:doc',

现在可以操作这些字符串:

  1. 只提取以";name";以及";doc";(忽略"默认"等(
  2. get-set-intersection以删除不同时存在两个字段的字段
  3. 从更高层次获得所有字段名称的列表:fields:1:type:1:fieldsfields:1:type:1:fields:0:type:1:fields的父级之一(条件是它们有相同的开始,并且以"字段"结束(

查看:https://github.com/apache/spark/blob/master/connector/avro/src/main/scala/org/apache/spark/sql/avro/SchemaConverters.scala#L58

Spark对模式转换器的实现似乎没有调用org.apache.avro.schema对象中的getDoc函数,也没有调用它通过迭代的org.apache.afro.schema.Field对象中的doc函数

它只读取类型元数据,并尝试将其转换为Spark的内部类型对象(StructType、StringType、LongType等(

我能想到的只有两个解决方案:

  1. 自己读取一个或一些文件并解析模式
  2. 将模式保存在一个单独的文件中,该文件应该足够容易解析,因为它是纯JSON

来自@bzu 的代码

from flatdict import FlatterDict
flat_schema = FlatterDict(schema)  # schema as python dict
names = {k.replace(':name', ''): flat_schema[k] for k in flat_schema if k.endswith(':name')}
docs = {k.replace(':doc', ''): flat_schema[k] for k in flat_schema if k.endswith(':doc')}
keys_with_doc = set(names.keys()) & set(docs.keys())
full_name = lambda key: '_'.join(
names[k] for k in sorted(names, key=len) if key.startswith(k) and k.split(':')[-2] == 'fields'
)
name_doc_map = {full_name(k): docs[k] for k in keys_with_doc}

解释

FlatterDict

例如:

value = flatdict.FlatterDict({'list': ['a', 'b', 'c']})

将与相同

value == {'list:0': 'a', 'list:1': 'b', 'list:2': 'c'}

所以基本上你的嵌套列表基本上是这样的:

{"fields0": ..., "fields1": ...}

有关FlatterDict的更多信息,请点击此处

set(names.keys()) & set(docs.keys())

names.keys()docs.keys从字典中获取所有键,目前有多个值,因此我们必须将它们放入一个集合中,以分组为一个变量。然后,该行使用&对这两个集合执行逐位运算符,基本上找到两个集合中的所有重复项,并将它们放入keys_with_doc中。例如

a={1:"a", 3:"c", 4:"d"}
b={1:"a", 2:"b"}
name=set(a.keys()) & set(b.keys())
name
>>> {1}

钥匙在左边:

dict = {1:"a",2:"b"}

full_name

这个lambda函数很棘手。它加入了一堆"_&";。这个";"一堆东西";是names的字典,循环将遍历该字典,并使用key=len和if语句从已排序的names字典中提取names[k]。基本上,通过在name_doc_map中使用这个lambda,您将创建一个新的字典,该字典使用原始集合keys_with_docdocs中的条件。

最新更新