优化 Spark scala 上的 where 请求



我是Apache Spark(和Scala)的新手,我想在读取csv文件并将其加载到DF上后立即应用一个简单的sql请求,而无需创建额外的数据帧或临时视图或表。

这是初始请求:

SELECT DISTINCT city from cities
WHERE id IN ("10", "20")
AND year IN ("2017", "2018")

这是我在 Scala 上尝试过的:

val cities = spark.read.options(Map("header" -> "true", "delimiter" -> ";")).csv("test.csv").select("city").distinct.where(""" id IN ("10", "20") AND year IN ("2017", "2018")"""))
cities.show(20)

但它不起作用。具体来说,出现问题似乎是因为它无法识别数据帧中的其他两列(因为我之前只选择了一列)。因此,我必须首先选择这三列,然后保存一个临时表(视图),然后在新数据帧中选择所需的列。我觉得这种方法太长太重了。

你能帮我解决这个问题吗???谢谢!

您的解决方案几乎是正确的,您只需要将 where 语句移动到select(..).distinct之前:

val cities = spark.read
  .options(Map("header" -> "true", "delimiter" -> ";"))
  .csv("test.csv")
  .where($"id".isin("10", "20") and $"year".isin("2017", "2018"))
  .select("city").distinct

Spark scala API 比声明式(与 SQL 不同)更势在必行,这就是为什么在你select("city")之后你丢失了数据帧中的所有其他字段。 以及为什么,正如其他人指出的那样,您应该在进行选择之前过滤/在哪里。这有点令人困惑,因为Scala DSL在语法上类似于SQL

正如sramalingam24和Raphael Roth所提到的,在从DataFrame中选择必填字段之前,必须应用where。过滤器,两者给出相同的结果,如下所示。dropDuplicates() 将删除 city 列中的 Duplicates。

    val cities = spark.read.options(Map("header" -> "true", "delimiter" -> ";"))
       .csv("test.csv")
       .filter($"id".isin("10", "20") and $"year".isin("2017", "2018"))
       .select("city")
       .dropDuplicates()

最新更新