Avro架构将激发StructType



这实际上与我之前的问题相同,但使用Avro而不是JSON作为数据格式。

我正在处理一个Spark数据帧,它可能从几个不同的模式版本之一加载数据:

// Version One
{"namespace": "com.example.avro",
 "type": "record",
 "name": "MeObject",
 "fields": [
     {"name": "A", "type": ["null", "int"], "default": null}
 ]
}
// Version Two
{"namespace": "com.example.avro",
 "type": "record",
 "name": "MeObject",
 "fields": [
     {"name": "A", "type": ["null", "int"], "default": null},
     {"name": "B", "type": ["null", "int"], "default": null}
 ]
}

我正在使用Spark Avro加载数据。

DataFrame df = context.read()
  .format("com.databricks.spark.avro")
  .load("path/to/avro/file");

其可以是版本一文件或版本二文件。然而,我希望能够以相同的方式处理它,将未知值设置为"null"。我在上一个问题中的建议是设置模式,但我不想重复自己在.avro文件中以及在StructType和朋友之间编写模式。如何将avro模式(文本文件或生成的MeObject.getClassSchema())转换为sparks StructType

Spark Avro有一个SchemaConverters,但它都是私有的,并返回一些奇怪的内部对象。

免责声明:这是一种肮脏的黑客攻击。这取决于几件事:

  • Python提供了一个轻量级的Avro处理库,由于其动态性,它不需要类型化的编写器
  • 空的Avro文件仍然是有效的文档
  • Spark模式可以转换为JSON和从JSON转换

以下代码读取一个Avro模式文件,创建一个具有给定模式的空Avro文件,使用spark-csv读取它,并将Spark模式输出为JSON文件。

import argparse
import tempfile
import avro.schema
from avro.datafile import DataFileWriter
from avro.io import DatumWriter
from pyspark import SparkContext
from pyspark.sql import SQLContext
def parse_schema(schema):
    with open(schema) as fr:
        return avro.schema.parse(open(schema).read())
def write_dummy(schema):
    tmp = tempfile.mktemp(suffix='.avro')
    with open(tmp, "w") as fw:
        writer = DataFileWriter(fw, DatumWriter(), schema)
        writer.close()
    return tmp
def write_spark_schema(path, schema):
    with open(path, 'w') as fw:
        fw.write(schema.json())

def main():
    parser = argparse.ArgumentParser(description='Avro schema converter')
    parser.add_argument('--schema')
    parser.add_argument('--output')
    args = parser.parse_args()
    sc = SparkContext('local[1]', 'Avro schema converter')
    sqlContext = SQLContext(sc)
    df = (sqlContext.read.format('com.databricks.spark.avro')
            .load(write_dummy(parse_schema(args.schema))))
    write_spark_schema(args.output, df.schema)
    sc.stop()

if __name__ == '__main__':
    main()

用法:

bin/spark-submit --packages com.databricks:spark-avro_2.10:2.0.1  
   avro_to_spark_schema.py 
   --schema path_to_avro_schema.avsc 
   --output path_to_spark_schema.json

读取模式:

import scala.io.Source
import org.apache.spark.sql.types.{DataType, StructType}
val json: String = Source.fromFile("schema.json").getLines.toList.head
val schema: StructType = DataType.fromJson(json).asInstanceOf[StructType]

请看看这是否有帮助,尽管有点晚。为了我目前的工作,我一直在努力。我使用了Databricks的schemaconverter。我想,您正试图用给定的模式读取avro文件。

 val schemaObj = new Schema.Parser().parse(new File(avscfilepath));
 var sparkSchema : StructType = new StructType
 import scala.collection.JavaConversions._     
 for(field <- schemaObj.getFields()){
  sparkSchema = sparkSchema.add(field.name, SchemaConverters.toSqlType(field.schema).dataType)
 }
 sparkSchema

使用PySpark:

with open('path/to/avro/file','r') as avro_file:
        avro_scheme = avro_file.read()
    
    df = spark
        .read
        .format("avro")
        .option("avroSchema", avro_scheme)
        .load()
    
    df.schema

相关内容

  • 没有找到相关文章

最新更新