Spark SQL中的阵列相交



我有一个带有数组类型列的表,名为 writer,其具有诸如 array[value1, value2]array[value2, value3] ....等的值。

我正在做self join以获取在数组之间具有共同值的结果。我尝试了:

sqlContext.sql("SELECT R2.writer FROM table R1 JOIN table R2 ON R1.id != R2.id WHERE ARRAY_INTERSECTION(R1.writer, R2.writer)[0] is not null ")

sqlContext.sql("SELECT R2.writer FROM table R1 JOIN table R2 ON R1.id != R2.id WHERE ARRAY_INTERSECT(R1.writer, R2.writer)[0] is not null ")

但有同样的例外:

线程" main" org.apache.spark.sql.sql.analysisexception中的例外: 未定义的功能:" array_intersect"。这个功能既不是 注册的临时功能或永久函数注册 数据库"默认"。第1行POS 80

SPARK SQL可能不支持ARRAY_INTERSECTIONARRAY_INTERSECT。如何在Spark SQL中实现目标?

因为SPARK 2.4 array_intersect功能可以直接在SQL

中使用
spark.sql(
  "SELECT array_intersect(array(1, 42), array(42, 3)) AS intersection"
).show()
+------------+
|intersection|
+------------+
|        [42]|
+------------+

Dataset API:

import org.apache.spark.sql.functions.array_intersect
Seq((Seq(1, 42), Seq(42, 3)))
  .toDF("a", "b")
  .select(array_intersect($"a", $"b") as "intersection")
  .show()
+------------+
|intersection|
+------------+
|        [42]|
+------------+

等效函数也存在于其他语言中:

  • pyspark.sql.functions.array_intersect in pyspark。
  • SparkR::array_intersect在Sparkr。

您需要一个UDF:

import org.apache.spark.sql.functions.udf
spark.udf.register("array_intersect", 
  (xs: Seq[String], ys: Seq[String]) => xs.intersect(ys))

,然后检查交叉点是否为空:

scala> spark.sql("SELECT size(array_intersect(array('1', '2'), array('3', '4'))) = 0").show
+-----------------------------------------+
|(size(UDF(array(1, 2), array(3, 4))) = 0)|
+-----------------------------------------+
|                                     true|
+-----------------------------------------+

scala> spark.sql("SELECT size(array_intersect(array('1', '2'), array('1', '4'))) = 0").show
+-----------------------------------------+
|(size(UDF(array(1, 2), array(1, 4))) = 0)|
+-----------------------------------------+
|                                    false|
+-----------------------------------------+

相关内容

  • 没有找到相关文章

最新更新