Apache Spark SQL标识符应为异常



我的问题与此问题非常相似:Apache Spark SQL问题:java.lang.RuntimeException:[1.517]失败:需要标识符。但我只是不知道我的问题在哪里。我使用SQLite作为数据库后端。连接语句和简单的select语句可以很好地工作。

违规线路:

val df = tableData.selectExpr(tablesMap(t).toSeq:_*).map(r => myMapFunc(r))

tablesMap包含作为键的表名和作为表达式的字符串数组。打印后,阵列看起来像这样:

WrappedArray([My Col A], [ColB] || [Col C] AS ColB)

表名也包含在方括号中,因为它包含空格。我得到的例外:

Exception in thread "main" java.lang.RuntimeException: [1.1] failure: identifier expected

我已经确保不使用任何Spark Sql关键字。在我看来,这段代码失败的原因可能有两个:1)我不知怎么处理错了列名中的空格。2) 我处理串联错误。

我使用的是一个类似CSV的资源文件,其中包含我希望在表上进行求值的表达式。除了这个文件之外,我希望允许用户在运行时指定其他表及其相应的列表达式。文件如下:

TableName,`Col A`,`ColB`,CONCAT(`ColB`, ' ', `Col C`)

这显然不起作用。尽管如此,我还是想重用这个文件,当然要修改。我的想法是将带有表达式的列从一个字符串数组映射到一个火花列序列,就像现在一样。(这是我能想到的唯一解决方案,因为我想避免只为这一项功能引入所有的配置单元依赖项。)我将为表达式引入一个小语法,用$concatas等函数的一些关键字标记原始列名。但我怎么能做到这一点呢?我试过这样的东西,但它离编译还很远。

def columnsMapFunc( expr: String) : Column = {
    if(expr(0) == '$')
        return expr.drop(1)
    else
        return concat(extractedColumnNames).as(newName)
}

一般来说,使用包含空格的名称会带来问题,但用反引号替换方括号应该可以解决问题:

val df = sc.parallelize(Seq((1,"A"), (2, "B"))).toDF("f o o", "b a r")
df.registerTempTable("foo bar")
df.selectExpr("`f o o`").show
// +-----+
// |f o o|
// +-----+
// |    1|
// |    2|
// +-----+
sqlContext.sql("SELECT `b a r` FROM `foo bar`").show
// +-----+
// |b a r|
// +-----+
// |    A|
// |    B|
// +-----+

对于串联,您必须使用concat函数:

df.selectExpr("""concat(`f o o`, " ", `b a r`)""").show
// +----------------------+
// |'concat(f o o, ,b a r)|
// +----------------------+
// |                   1 A|
// |                   2 B|
// +----------------------+

但它需要Spark 1.4.0中的CCD_ 5。

在实践中,我会在加载数据后简单地重命名列

df.toDF("foo", "bar")
// org.apache.spark.sql.DataFrame = [foo: int, bar: string]

并使用函数而不是表达式字符串(concat函数仅在Spark>=1.5.0中可用,对于1.4及更早版本,您需要一个UDF):

import org.apache.spark.sql.functions.concat
df.select($"f o o", concat($"f o o", lit(" "), $"b a r")).show
// +----------------------+
// |'concat(f o o, ,b a r)|
// +----------------------+
// |                   1 A|
// |                   2 B|
// +----------------------+

还有一个concat_ws函数,它将分隔符作为第一个参数:

df.selectExpr("""concat_ws(" ", `f o o`, `b a r`)""")
df.select($"f o o", concat_ws(" ", $"f o o", $"b a r"))

相关内容

最新更新