弗林克的通用avro避难所:超效率getproducedType



我想创建一个通用的avro Deserializer,并与kafka/flink一起使用。

要做到这一点,我必须从flink api扩展DeSerializationschema:

import java.io.ByteArrayInputStream
import com.sksamuel.avro4s.{AvroInputStream, FromRecord, SchemaFor, ToRecord}
import org.apache.flink.api.common.serialization.DeserializationSchema
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.typeutils.TypeExtractor
class MyGenericAvroDeserializer[T](implicit schema: SchemaFor[T], toRecord: ToRecord[T], fromRecord: FromRecord[T])
  extends DeserializationSchema[T] {
  override def isEndOfStream(nextElement: T): Boolean = false
  override def deserialize(message: Array[Byte]): T = {
    AvroInputStream.binary[T](new ByteArrayInputStream(message)).iterator.toSeq.head
  }
  override def getProducedType: TypeInformation[T] = TypeExtractor.getForClass(classOf[T])
}

这样做在编译时出现问题,因为t似乎不是一类:

class type required but T found
override def getProducedType: TypeInformation[T] = TypeExtractor.getForClass(classOf[T])

我回答自己的问题。我必须使用asInstanceOf使用ClassTag强制类型,但现在起作用:

import java.io.ByteArrayInputStream
import com.sksamuel.avro4s.{AvroInputStream, FromRecord, SchemaFor, ToRecord}
import org.apache.flink.api.common.serialization.DeserializationSchema
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.typeutils.TypeExtractor
import scala.reflect.ClassTag
import scala.reflect._
class MyGenericAvroDeserializer[T: ClassTag](implicit schema: SchemaFor[T], toRecord: ToRecord[T], fromRecord: FromRecord[T])
  extends DeserializationSchema[T] {
  override def isEndOfStream(nextElement: T): Boolean = false
  override def deserialize(message: Array[Byte]): T = {
    AvroInputStream.binary[T](new ByteArrayInputStream(message)).iterator.toSeq.head
  }
  override def getProducedType: TypeInformation[T] =
    TypeExtractor.getForClass(classTag[T].runtimeClass).asInstanceOf[TypeInformation[T]]
}

相关内容

  • 没有找到相关文章

最新更新