我正在尝试编写一个Scala函数,可以根据提供的输入字符串推断Spark数据类型:
/**
* Example:
* ========
* toSparkType("string") => StringType
* toSparkType("boolean") => BooleanType
* toSparkType("date") => DateType
* etc.
*/
def toSparkType(inputType : String) : DataType = {
var dt : DataType = null
if(matchesStringRegex(inputType)) {
dt = StringType
} else if(matchesBooleanRegex(inputType)) {
dt = BooleanType
} else if(matchesDateRegex(inputType)) {
dt = DateType
} else if(...) {
...
}
dt
}
我的目标是支持可用DataTypes
的一个大子集,如果不是全部的话。当我开始实现这个函数时,我开始思考:"Spark/Scala可能已经有一个助手/util方法来为我做这件事。"毕竟,我知道我可以这样做:
var structType = new StructType()
structType.add("some_new_string_col", "string", true, Metadata.empty)
structType.add("some_new_boolean_col", "boolean", true, Metadata.empty)
structType.add("some_new_date_col", "date", true, Metadata.empty)
Scala和/或Spark将隐式地将我的"string"
参数转换为StringType
,等等。所以我问:我可以用Spark或Scala做什么神奇的事情来帮助我实现我的转换器方法?
Spark/Scala可能已经有一个助手/util方法来为我做这件事。
你是对的。Spark已经有了自己的模式和数据类型推断代码,它用来从底层数据源(csv, json等)推断模式,所以你可以看看实现你自己的(实际实现被标记为Spark私有,并与RDD和内部类绑定,所以它不能直接从Spark外部的代码中使用,但应该给你一个好主意如何去做。)
考虑到csv是平面类型(json可以有嵌套结构),csv模式推断相对更直接,应该可以帮助您完成上面要实现的任务。所以我将解释csv推理是如何工作的(json推理只需要考虑可能的嵌套结构,但数据类型推理非常类似)。
有了这个开场白,你要看的是CSVInferSchema对象。特别是,查看infer
方法,它接受一个RDD[Array[String]]
,并推断整个RDD中数组的每个元素的数据类型。它的方法是——它将每个字段标记为NullType
开始,然后当它迭代RDD
中的下一行值(Array[String]
)时,如果新的DataType
更具体,它会将已经推断的DataType
更新为新的DataType
。
val rootTypes: Array[DataType] =
tokenRdd.aggregate(startType)(inferRowType(options), mergeRowTypes)
现在inferRowType
对行中的每个字段调用inferField
。inferField
实现是你可能在寻找的——到目前为止,它对一个特定的字段和当前行字段的字符串值作为参数进行类型推断。然后,它返回现有的推断类型,或者如果推断的新类型比新类型更具体。
代码的相关部分如下:
typeSoFar match {
case NullType => tryParseInteger(field, options)
case IntegerType => tryParseInteger(field, options)
case LongType => tryParseLong(field, options)
case _: DecimalType => tryParseDecimal(field, options)
case DoubleType => tryParseDouble(field, options)
case TimestampType => tryParseTimestamp(field, options)
case BooleanType => tryParseBoolean(field, options)
case StringType => StringType
case other: DataType =>
throw new UnsupportedOperationException(s"Unexpected data type $other")
}
请注意,如果typeSoFar
是NullType,那么它首先尝试将其解析为Integer
,但tryParseInteger
调用是对较低类型解析的调用链。因此,如果它不能将值解析为Integer,那么它将调用tryParseLong
,如果失败将调用tryParseDecimal
,如果失败将调用tryParseDouble
w.o.f.w.i tryParseTimestamp
w.o.f.w.i tryParseBoolean
w.o.f.w.i.最后stringType
。
所以你可以使用非常相似的逻辑来实现你的用例。(如果您不需要跨行合并,那么您只需逐字实现所有tryParse*
方法并简单地调用tryParseInteger
。无需编写自己的正则表达式)
是的,Spark当然有你需要的魔力。
在Spark 2。它是CatalystSqlParser
对象,在这里定义。
import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
CatalystSqlParser.parseDataType("string") // StringType
CatalystSqlParser.parseDataType("int") // IntegerType
以此类推。
但据我所知,它不是公共API的一部分,因此可能在下一个版本中更改而没有任何警告。
所以你可以实现你的方法为:
def toSparkType(inputType: String): DataType = CatalystSqlParser.parseDataType(inputType)
从scala中,你似乎不能神奇地做你想做的事情,看看下面的例子:
import com.scalakata._
@instrument class Playground {
val x = 5
def f[T](v: T) = v
f(x)
val y = "boolean"
f(y)
def manOf[T: Manifest](t: T): Manifest[T] = manifest[T]
println(manOf(y))
}
我想在运行时获得变量的类型。
现在从spark开始,因为我现在没有安装,我不能写一个例子,但是没有什么明显的可以使用,所以我建议你继续写toSparkType()
,因为你已经开始了,但是先看看pyspark.sql.types的源代码。
问题是你总是传递一个字符串
如果您将字符串文字写成数据类型名称,即:"StringType","IntegerType"使用这个函数-
def StrtoDatatype(str: String): org.apache.spark.sql.types.DataType = {
val m = ru.runtimeMirror(getClass.getClassLoader)
val module = m.staticModule(s"org.apache.spark.sql.types.$str")
m.reflectModule(module).instance.asInstanceOf[org.apache.spark.sql.types.DataType]
}
如果你有字符串字面值- string, int等
def sqlStrtoDatatype(str: String): org.apache.spark.sql.types.DataType = {
CatalystSqlParser.parseDataType(str)
}