为什么火花是隐含的._ import不帮助在方法内的编码器派生?



因此,从已创建的实例中导入隐式成员可以正常工作,

object Test extends App {
class Bag {
implicit val ssss: String = "omg"
}
def call(): Unit = {
val bag = new Bag
import bag._
val s = implicitly[String]
println(s)
}
call()
}

但是,如果我尝试对spark.implicits._

做同样的事情
object Test extends App {
val spark: SparkSession = ...
def call(): Unit = {
import spark.implicits._
case class Person(id: Long, name: String)
// I can summon an existing encoder
// val enc = implicitly[Encoder[Long]]
// but encoder derivation is failing for some reason
// val encP = implicitly[Encoder[Person]]
val df: Dataset[Person] =
spark.range(10).map(i => Person(i, i.toString))
df.show()
}
}

不能推导出Encoder[Person]

Unable to find encoder for type Person. An implicit Encoder[Person] is needed to store Person instances in a Dataset. Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits._  Support for serializing other types will be added in future releases.
.map(i => Person(i, i.toString)

但是,如果我在方法之外创建数据框架,它就能工作,

object Test extends App {
val spark: SparkSession = ...
import spark.implicits._
case class Person(id: Long, name: String)
val df: Dataset[Person] =
spark.range(10).map(i => Person(i, i.toString))
df.show()
}

用Scala版本2.13.102.12.17与Spark版本3.3.1进行测试。

本地case class是提供行为的原因。本地类有所谓的自由类型更多信息可以在这里查看。您可以尝试在本地范围内为Person添加TypeTag,看看是否有帮助。

正如您自己已经发现的,本地Person没有TypeTag。但它有WeakTypeTag(和ClassTag)。让我们尝试为这样的类定义Encoder

构造TypeTag的朴素方法不起作用

如何手动创建TypeTag ?

在scala 2.12中,为什么在运行时创建的TypeTag都不是可序列化的?

Scala Spark Encodersproduct[X](其中X是case类)一直给我"No TypeTag available for X";误差

火花:DF。as[Type]编译失败

implicit def ttag[A: WeakTypeTag]: TypeTag[A] = {
val ttag = null // hiding implicit by name
val wttagImpl = weakTypeTag[A].asInstanceOf[WeakTypeTag[A] {val mirror: Mirror; val tpec: TypeCreator}]
TypeTag[A](wttagImpl.mirror, wttagImpl.tpec)
}

java.lang.NoClassDefFoundError: no Java class corresponding to Person found

https://gist.github.com/DmytroMitin/41b7439d2e504e37f29b02e3500d24b1

的结果类似
def typeToTypeTag[T](
tpe: Type,
mirror: api.Mirror[universe.type]
): TypeTag[T] = {
TypeTag(mirror, new TypeCreator {
def apply[U <: api.Universe with Singleton](m: api.Mirror[U]) = {
assert(m eq mirror, s"TypeTag[$tpe] defined in $mirror cannot be migrated to $m.")
tpe.asInstanceOf[U#Type]
}
})
}
implicit def ttag[T: WeakTypeTag]: TypeTag[T] = {
val ttag = null
typeToTypeTag(weakTypeOf[T], mirror)
}

java.lang.NoClassDefFoundError: no Java class corresponding to Person found

https://gist.github.com/DmytroMitin/c7a24abf1ff1011a1c87aa9d161d6395

implicit val personTtag: TypeTag[Person] = {
val personTtag = null
tb.eval(q"org.apache.spark.sql.catalyst.ScalaReflection.universe.typeTag[${weakTypeOf[Person]}]")
.asInstanceOf[TypeTag[Person]]
}

scala.tools.reflect.ToolBoxError: reflective toolbox failed due to unresolved free type variables

https://gist.github.com/DmytroMitin/6e35c0332f845fcd227d35ec49d4122f

这就是Encoder[T]如何定义为T具有TypeTag

implicit def newProductEncoder[T <: Product : TypeTag]: Encoder[T] = Encoders.product[T]
object Encoders {
def product[T <: Product : TypeTag]: Encoder[T] = ExpressionEncoder()
}
object ExpressionEncoder {
def apply[T : TypeTag](): ExpressionEncoder[T] = {
val mirror = ScalaReflection.mirror
val tpe = typeTag[T].in(mirror).tpe
val cls = mirror.runtimeClass(tpe)
val serializer = ScalaReflection.serializerForType(tpe)
val deserializer = ScalaReflection.deserializerForType(tpe)
new ExpressionEncoder[T](
serializer,
deserializer,
ClassTag[T](cls)
)
}
}

让我们试着为T修改WeakTypeTagClassTag

implicit def apply[T: WeakTypeTag /*: ClassTag*/]: Encoder[T] = {
val tpe = weakTypeTag[T].in(mirror).tpe
val cls = mirror.runtimeClass(tpe)
val serializer = ScalaReflection.serializerForType(tpe)
val deserializer = ScalaReflection.deserializerForType(tpe)
new ExpressionEncoder[T](
serializer,
deserializer,
ClassTag[T](cls)
)
}

java.lang.NoClassDefFoundError: no Java class corresponding to Person found

https://gist.github.com/DmytroMitin/b58848fa6575b6fab0e9b8285095cc60

// (*)
implicit def apply[T/*: WeakTypeTag*/ : ClassTag]: Encoder[T] = {
val tpe = mirror.classSymbol(classTag[T].runtimeClass).toType
val serializer = ScalaReflection.serializerForType(tpe)
val deserializer = ScalaReflection.deserializerForType(tpe)
new ExpressionEncoder[T](
serializer,
deserializer,
classTag[T]
)
}

org.apache.spark.SparkException: Task not serializable

Caused by: java.io.NotSerializableException: Main

https://gist.github.com/DmytroMitin/0c86933f96e136d44fff555295ce01dd

最后让Main扩展Serializable

+---+----+
| id|name|
+---+----+
|  0|   0|
|  1|   1|
|  2|   2|
|  3|   3|
|  4|   4|
|  5|   5|
|  6|   6|
|  7|   7|
|  8|   8|
|  9|   9|
+---+----+

https://gist.github.com/DmytroMitin/0e9b0bd2ed6237a4a1e1c40d620a9d88

So(*)是正确的Encoder.


这似乎不适用于泛型localPerson

case class Person[T](id: Long, name: String, t: T)

java.lang.UnsupportedOperationException: No Encoder found for Person$1

https://gist.github.com/DmytroMitin/69496ce257fc9a3a7a5fbd004c52dcc0

scala.ScalaReflectionException: free type Person is not a class

https://gist.github.com/DmytroMitin/07bfe954dca677f0a39c06779b94280e


对于一般的本地类,编码器应该是(同时使用WeakTypeTagClassTag)

implicit def apply[T: WeakTypeTag : ClassTag]: Encoder[T] = {
val tpe0 = weakTypeTag[T].in(mirror).tpe
val typeArgs = tpe0/*.dealias*/.typeArgs
val tpe = mirror.classSymbol(classTag[T].runtimeClass).toType
val tpe1 = appliedType(tpe.typeConstructor, typeArgs)
val serializer = ScalaReflection.serializerForType(tpe1)
val deserializer = ScalaReflection.deserializerForType(tpe1)
new ExpressionEncoder[T](
serializer,
deserializer,
classTag[T]
)
}

https://gist.github.com/DmytroMitin/08c8f21ffb1427bfa15dd21fbdfb77fa


对于类型参数为泛型局部类的泛型局部类,这不起作用

val df: Dataset[Person[Person[Int]]] =
spark.range(10).map(i => Person(i, i.toString, Person(i, i.toString, i.toInt)))

scala.ScalaReflectionException: free type Person is not a class

https://gist.github.com/DmytroMitin/5bceb2b81f2391c5c312a045edb827a8


改进版本的编解码器:

case class Application(tycon: ClassTag[_], targs: List[Application])
class DeepClassTag[T](val classTags: Application)
object DeepClassTag {
def apply[T: DeepClassTag]: DeepClassTag[T] = implicitly[DeepClassTag[T]]
implicit def deepClassTag0[A: ClassTag]: DeepClassTag[A] =
new DeepClassTag(Application(classTag[A], List()))
implicit def deepClassTag11[A[_], B1](implicit tycon: ClassTag[A[_]], dct1: DeepClassTag[B1]): DeepClassTag[A[B1]] =
new DeepClassTag(Application(tycon, List(dct1.classTags)))
implicit def deepClassTag12[A[_,_], B1, B2](implicit tycon: ClassTag[A[_,_]], dct1: DeepClassTag[B1], dct2: DeepClassTag[B1]): DeepClassTag[A[B1, B2]] =
new DeepClassTag(Application(tycon, List(dct1.classTags, dct2.classTags)))
// ...
implicit def deepClassTag2[A[_[_]], B1[_]](implicit tycon: ClassTag[A[B1]], dct1: DeepClassTag[B1[_]]): DeepClassTag[A[B1]] =
new DeepClassTag(Application(tycon, List(dct1.classTags)))
// ...
}
def improveStaticType[T: WeakTypeTag : DeepClassTag]: Type =
improveDynamicType(weakTypeOf[T], DeepClassTag[T].classTags)
def improveDynamicType(tpe: Type, classTags: Application): Type = {
val newTycon = improveFreeType(tpe, classTags.tycon.runtimeClass)
val targs = tpe.dealias.typeArgs
assert(targs.length == classTags.targs.length, s"( $targs ).length == ( ${classTags.targs} ).length")
val newArgs = targs.zip(classTags.targs).map((improveDynamicType _).tupled)
appliedType(newTycon, newArgs)
}
def improveFreeType(tpe: Type, cls: Class[_]): Type =
if (internal.isFreeType(tpe.typeSymbol)) {
val typeArgs = tpe.dealias.typeArgs
val typeConstructor = mirror.classSymbol(cls).toType.typeConstructor
appliedType(typeConstructor, typeArgs)
} else tpe
implicit def enc[T: WeakTypeTag : ClassTag : DeepClassTag]: Encoder[T] = {
val tpe = improveStaticType[T]
val serializer = ScalaReflection.serializerForType(tpe)
val deserializer = ScalaReflection.deserializerForType(tpe)
new ExpressionEncoder[T](
serializer,
deserializer,
classTag[T]
)
}

https://gist.github.com/DmytroMitin/56044515e031fcf1e977ab213013861d


DeepClassTag似乎不适用于高级类

https://gist.github.com/DmytroMitin/6388a437507e8389f30230e08382d9ff

改进版本,但仍然不总是工作(有太多的类型构造函数的形状)

https://gist.github.com/DmytroMitin/2625ee20695404c6fc118ab8680808f2


类型类DeepClassTag可以用宏定义,而不是为不同形状的类型构造函数手工定义类型类实例,如下所示

import scala.language.experimental.macros
import scala.reflect.ClassTag
import scala.reflect.macros.whitebox
case class Application(tycon: ClassTag[_], targs: List[Application])
class DeepClassTag[T](val classTags: Application)
object DeepClassTag {
def apply[T: DeepClassTag]: DeepClassTag[T] = implicitly[DeepClassTag[T]]
implicit def mkDeepClassTag[T]/*(implicit tCtag: ClassTag[T])*/: DeepClassTag[T] =
macro DeepClassTagMacros.mkDeepClassTagImpl[T]
}
class DeepClassTagMacros(val c: whitebox.Context) {
import c.universe._
def findInstance[TC[_]](tpe: Type)(implicit wttag: WeakTypeTag[TC[_]]): Tree =
c.inferImplicitValue(
appliedType(weakTypeOf[TC[_]].typeConstructor, tpe),
silent = false
)
def mkDeepClassTagImpl[T: WeakTypeTag]/*(tCtag: c.Tree)*/ : Tree = {
val T = weakTypeOf[T]
val tCtag = findInstance[ClassTag](T)
val targCtags = T.dealias.typeArgs.map(arg => {
val argInst = findInstance[DeepClassTag](arg)
q"$argInst.classTags"
})
val targClassTags = q"_root_.scala.List.apply[Application](..$targCtags)"
q"new DeepClassTag[$T](Application($tCtag, $targClassTags))"
}
}


我的PR到Spark支持本地类:https://github.com/apache/spark/pull/38740

相关内容

  • 没有找到相关文章

最新更新