我正在尝试从SparkSQL中写入和读取Parquet文件。由于模式进化的原因,我想在写作和阅读中使用Avro模式。
我的理解是,使用例如AvroParquetWriter和Avro的Generic API,这在Spark之外(或在Spark内手动)是可能的。然而,我想使用SparkSQL的write()和read()方法(它们与DataFrameWriter和DataFrameReader一起使用),并且与SparkSQL很好地集成(我将编写和读取数据集)。
我一辈子都想不出该怎么做,我想知道这是否可能。SparkSQL镶木地板格式似乎唯一支持的选项是"压缩"one_answers"mergeSchema",即没有指定备用模式格式或备用模式的选项。换句话说,使用SparkSQL API似乎无法使用Avro模式读取/写入Parquet文件。但也许我只是错过了什么?
为了澄清,我还了解到,这基本上只会在写入时将Avro模式添加到Parquet元数据中,在读取时再添加一个翻译层(Parquet格式->Avro模式->SparkSQL内部格式),但会特别允许我为缺失的列添加默认值(Avro模式支持,但Parquet模式不支持)。
此外,我不想找到一种将Avro转换为Parquet或Parquet转换为Avro的方法(而是一种将它们一起使用的方法),也不想找到在SparkSQL中读/写普通Avro的方式(可以使用databricks/spark-Avro)。
我正在做类似的事情。我使用avro模式写入镶木地板文件,但不要将其读作avro。但同样的技巧也应该适用于阅读。我不确定这是否是最好的方法,但无论如何都是这样:我有AvroData.avsc,它有avro模式。
KafkaUtils.createDirectStream[String,Array[Byte],StringDecoder,DefaultDecoder,Tuple2[String, Array[Byte]]](ssc, kafkaProps, fromOffsets, messageHandler)
kafkaArr.foreachRDD { (rdd,time)
=> { val schema = SchemaConverters.toSqlType(AvroData.getClassSchema).dataType.asInstanceOf[StructType] val ardd = rdd.mapPartitions{itr =>
itr.map { r =>
try {
val cr = avroToListWithAudit(r._2, offsetSaved, loadDate, timeNow.toString)
Row.fromSeq(cr.toArray)
} catch{
case e:Exception => LogHandler.log.error("Exception while converting to Avro" + e.printStackTrace())
System.exit(-1)
Row(0) //This is just to allow compiler to accept. On exception, the application will exit before this point
}
}
}
public static List avroToListWithAudit(byte[] kfkBytes, String kfkOffset, String loaddate, String loadtime ) throws IOException {
AvroData av = getAvroData(kfkBytes);
av.setLoaddate(loaddate);
av.setLoadtime(loadtime);
av.setKafkaOffset(kfkOffset);
return avroToList(av);
}
public static List avroToList(AvroData a) throws UnsupportedEncodingException{
List<Object> l = new ArrayList<>();
for (Schema.Field f : a.getSchema().getFields()) {
String field = f.name().toString();
Object value = a.get(f.name());
if (value == null) {
//System.out.println("Adding null");
l.add("");
}
else {
switch (f.schema().getType().getName()){
case "union"://System.out.println("Adding union");
l.add(value.toString());
break;
default:l.add(value);
break;
}
}
}
return l;
}
getAvroData方法需要有从原始字节构造avro对象的代码。我还试图找到一种方法来做到这一点,而不必显式地指定每个属性设置器,但似乎没有。
public static AvroData getAvroData (bytes)
{
AvroData av = AvroData.newBuilder().build();
try {
av.setAttr(String.valueOf("xyz"));
.....
}
}
希望它能帮助