如何在Spark SQL中定义自定义类型的模式



下面的示例代码尝试将一些case对象放入数据框架中。该代码包括使用以下特征定义的case对象层次结构和case类:

import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.sql.SQLContext
sealed trait Some
case object AType extends Some
case object BType extends Some
case class Data( name : String, t: Some)
object Example {
  def main(args: Array[String]) : Unit = {
    val conf = new SparkConf()
      .setAppName( "Example" )
      .setMaster( "local[*]")
    val sc = new SparkContext(conf)
    val sqlContext = new SQLContext(sc)
    import sqlContext.implicits._
    val df = sc.parallelize( Seq( Data( "a", AType), Data( "b", BType) ), 4).toDF()
    df.show()
  }
}    

在执行代码时,我不幸遇到了以下异常:

java.lang.UnsupportedOperationException: Schema for type Some is not supported

  • 是否有可能为某些类型(这里是类型Some)添加或定义模式?
  • 是否存在另一种方法来表示这种枚举?
    • 我尝试直接使用Enumeration,但也没有成功。(见下文)

Enumeration代码:

object Some extends Enumeration {
  type Some = Value
  val AType, BType = Value
}

提前感谢。我希望,最好的方法是不要使用字符串。

火花2.0.0 + :

UserDefinedType在Spark 2.0.0中被设为私有,目前还没有Dataset友好的替代品。

见:Spark -14155(隐藏UserDefinedType在Spark 2.0)

大多数情况下,静态类型的Dataset可以作为替代Jira SPARK-7768将在目标版本2.4中再次公开UDT API。

参见如何在数据集中存储自定义对象?

火花& lt;2.0.0

是否有可能为某些类型(这里是Some类型)添加或定义模式?

我想答案取决于你有多需要这个。看起来可以创建UserDefinedType,但它需要访问DeveloperApi,并且不完全直接或文档齐全。

import org.apache.spark.sql.types._
@SQLUserDefinedType(udt = classOf[SomeUDT])
sealed trait Some
case object AType extends Some
case object BType extends Some
class SomeUDT extends UserDefinedType[Some] {
  override def sqlType: DataType = IntegerType
  override def serialize(obj: Any) = {
    obj match {
      case AType => 0
      case BType => 1
    }
  }
  override def deserialize(datum: Any): Some = {
    datum match {
      case 0 => AType
      case 1 => BType
    }
  }
  override def userClass: Class[Some] = classOf[Some]
}

您可能也应该重写hashCodeequals

对应的PySpark代码如下所示:

from enum import Enum, unique
from pyspark.sql.types import UserDefinedType, IntegerType
class SomeUDT(UserDefinedType):
    @classmethod
    def sqlType(self):
        return IntegerType()
    @classmethod
    def module(cls):
        return cls.__module__
    @classmethod 
    def scalaUDT(cls): # Required in Spark < 1.5
        return 'net.zero323.enum.SomeUDT'
    def serialize(self, obj):
        return obj.value
    def deserialize(self, datum):
        return {x.value: x for x in Some}[datum]
@unique
class Some(Enum):
    __UDT__ = SomeUDT()
    AType = 0
    BType = 1

In Spark <1.5 Python UDT需要一个配对的Scala UDT,但在1.5中似乎不再是这样了。

对于简单的UDT,您可以使用简单类型(例如IntegerType而不是整个Struct)。

相关内容

  • 没有找到相关文章

最新更新