我正在努力在Scala中创建一个通用的AvroSerde。我将把这个serde与Flink结合使用,因此这个serde本身也应该是可序列化的。Avro 对 Scala 没有任何原生支持,但是有一些库可以使用无形状从案例类转换为泛型记录。注意:此泛型序列化程序将仅使用案例类进行实例化。
首先,我尝试使用 Avro4s 实现这个 serde。通过确保泛型类型绑定到FromRecord
和RecordFrom
的上下文,我很容易编译它,但是FromRecord
和RecordFrom
都不可序列化,因此我不能在 Flink 中使用此 serde。
目前,我正在尝试一种不同的库无形状数据类型,它也使用无形状。我当前的代码如下所示:
class Serializer[T : TypeTag : ClassTag] {
//Get type of the class at run time
val inputClassType: Class[T] = classTag[T].runtimeClass.asInstanceOf[Class[T]]
//Get Avro Type
val avroType = AvroType[T]
def serialize(value : T) : Array[Byte] = {
var schema: Schema = null
if (classOf[SpecificRecordBase].isAssignableFrom(inputClassType)) {
schema = inputClassType.newInstance.asInstanceOf[SpecificRecordBase].getSchema
} else {
schema = ReflectData.get().getSchema(inputClassType)
}
val out: ByteArrayOutputStream = new ByteArrayOutputStream()
val encoder: BinaryEncoder = EncoderFactory.get().binaryEncoder(out, null)
var writer: DatumWriter[GenericRecord] = new GenericDatumWriter[GenericRecord](schema)
val genericRecord = avroType.toGenericRecord(value)
writer.write(genericRecord, encoder)
encoder.flush()
out.close()
out.toByteArray
}
def deserialize(message: Array[Byte]) : T = {
var schema: Schema = null
if (classOf[SpecificRecordBase].isAssignableFrom(inputClassType)) {
schema = inputClassType.newInstance.asInstanceOf[SpecificRecordBase].getSchema
} else {
schema = ReflectData.get().getSchema(inputClassType)
}
val datumReader = new GenericDatumReader[GenericRecord](schema)
val decoder = DecoderFactory.get().binaryDecoder(message, null)
avroType.fromGenericRecord(datumReader.read(null, decoder)).get
}
}
所以基本上我创建了一个AvroType[T]
它有两种方法fromGenericRecord
和toGenericRecord
(源(。这些方法需要一些隐式:LabelledGeneric.Aux[A, L]
、ToAvroRecord[L]
、tt: TypeTag[A]
和fromL: FromAvroRecord[L]
。
目前,由于缺少这些隐式,此代码会给出编译错误:
Error:(48, 51) could not find implicit value for parameter gen: shapeless.LabelledGeneric.Aux[T,L]
val genericRecord = avroType.toGenericRecord(value)
简单地重载toGenericRecord
和fromGenericRecord
方法中的隐式并不能解决它,因为那样我需要参数化serialize[L <: Hlist]
和deserialize[L <: Hlist]
我不能这样做,因为 Flink 不允许这些方法具有类型。
我对无形状和隐式几乎没有经验来理解我需要哪些上下文边界来解决这个问题,同时保持这个类可序列化。
希望有人可以帮助或指出我一些有用的资源。
谢谢 沃特
编辑
我不能通过方法传递隐式内容,也不能使它们参数化,因为我需要将 serde 基于 Flink 的序列化接口,这迫使我覆盖:byte[] serialize(T element)
和T deserialize(byte[] message)
如果我尝试将隐式传递给类本身,则需要将其更改为:
class Serializer[T : TypeTag : ClassTag, L <: HList](implicit gen: LabelledGeneric.Aux[T, L], toL: ToAvroRecord[L], fromL: FromAvroRecord[L])
但是如果我像这样实例化它:
case class Test(str: String)
val serializer = new Serializer[Test]
我收到此编译错误:
Error:(29, 26) wrong number of type arguments for shapeless.datatype.avro.Serializer, should be 2
val serializer = new Serializer[Test]
你应该Serializer
做一个类型类。 (顺便说一下,没有必要使用var
s 是一种不好的做法。
import java.io.ByteArrayOutputStream
import org.apache.avro.Schema
import org.apache.avro.generic.{GenericDatumReader, GenericDatumWriter, GenericRecord}
import org.apache.avro.io.{BinaryEncoder, DatumWriter, DecoderFactory, EncoderFactory}
import org.apache.avro.reflect.ReflectData
import org.apache.avro.specific.SpecificRecordBase
import org.apache.flink.api.common.serialization.{DeserializationSchema, SerializationSchema}
import org.apache.flink.api.common.typeinfo.TypeInformation
import shapeless.datatype.avro.{AvroType, FromAvroRecord, ToAvroRecord}
import shapeless.{HList, LabelledGeneric}
import scala.reflect.runtime.universe.TypeTag
import scala.reflect.{ClassTag, classTag}
trait Serializer[T] extends SerializationSchema[T] with DeserializationSchema[T] {
type L <: HList
}
object Serializer {
type Aux[T, L0 <: HList] = Serializer[T] { type L = L0 }
def apply[T](implicit serializer: Serializer[T]): Serializer[T] = serializer
implicit def mkSerializer[T : ClassTag : TypeTag, L0 <: HList](implicit
gen: LabelledGeneric.Aux[T, L0],
toL: ToAvroRecord[L0],
fromL: FromAvroRecord[L0]): Aux[T, L0] =
new Serializer[T] {
type L = L0
//Get type of the class at run time
val inputClassType: Class[T] = classTag[T].runtimeClass.asInstanceOf[Class[T]]
//Get Avro Type
val avroType = AvroType[T]
override def serialize(value : T) : Array[Byte] = {
val schema: Schema =
if (classOf[SpecificRecordBase].isAssignableFrom(inputClassType))
inputClassType.newInstance.asInstanceOf[SpecificRecordBase].getSchema
else ReflectData.get().getSchema(inputClassType)
val out: ByteArrayOutputStream = new ByteArrayOutputStream()
val encoder: BinaryEncoder = EncoderFactory.get().binaryEncoder(out, null)
val writer: DatumWriter[GenericRecord] = new GenericDatumWriter[GenericRecord](schema)
val genericRecord = avroType.toGenericRecord(value)
writer.write(genericRecord, encoder)
encoder.flush()
out.close()
out.toByteArray
}
override def deserialize(message: Array[Byte]) : T = {
val schema: Schema =
if (classOf[SpecificRecordBase].isAssignableFrom(inputClassType))
inputClassType.newInstance.asInstanceOf[SpecificRecordBase].getSchema
else ReflectData.get().getSchema(inputClassType)
val datumReader = new GenericDatumReader[GenericRecord](schema)
val decoder = DecoderFactory.get().binaryDecoder(message, null)
avroType.fromGenericRecord(datumReader.read(null, decoder)).get
}
override def isEndOfStream(nextElement: T): Boolean = ???
override def getProducedType: TypeInformation[T] = ???
}
}
case class Test(str: String)
val serializer = Serializer[Test]