我正在做一个使用 Spark SQL 1.4 的项目。看起来该版本不支持ANY
和ALL
功能。
因此,可以为这些功能编写UDFs
,或者我们是否有其他解决方法。
谢谢
这样的事情应该可以解决问题:
import scala.reflect.ClassTag
import org.apache.spark.sql.Column
type BinColOp = (Column, Column) => Column
def check[T](f: BinColOp)(
col: Column, pred: (Column, T) => Column, xs: Seq[T]) = {
xs.map(other => pred(col, other)).reduce(f)
}
val all = check[Column] (_ && _) _
val any = check[Column] (_ || _) _
用法示例:
val df = sc.parallelize(
(1L, "foo", 3.6) ::
(2L, "bar", -1.0) ::
Nil
).toDF("v", "x", "y")
df.select(all($"v", _ > _, Seq(lit(-1), lit(1)))).show
// +---------------------+
// |((v > -1) && (v > 1))|
// +---------------------+
// | false|
// | true|
// +---------------------+
df.select(any($"x", _ !== _, Seq(lit("foo"), lit("baz")))).show
// +--------------------------------+
// |(NOT (x = foo) || NOT (x = baz))|
// +--------------------------------+
// | true|
// | true|
// +--------------------------------+
df.select(all($"x", _ !== _, Seq(lit("foo"), lit("baz")))).show
// +--------------------------------+
// |(NOT (x = foo) && NOT (x = baz))|
// +--------------------------------+
// | false|
// | true|
// +--------------------------------+