我是Apache Spark(和Scala)的新手,我想在读取csv文件并将其加载到DF上后立即应用一个简单的sql请求,而无需创建额外的数据帧或临时视图或表。
这是初始请求:
SELECT DISTINCT city from cities
WHERE id IN ("10", "20")
AND year IN ("2017", "2018")
这是我在 Scala 上尝试过的:
val cities = spark.read.options(Map("header" -> "true", "delimiter" -> ";")).csv("test.csv").select("city").distinct.where(""" id IN ("10", "20") AND year IN ("2017", "2018")"""))
cities.show(20)
但它不起作用。具体来说,出现问题似乎是因为它无法识别数据帧中的其他两列(因为我之前只选择了一列)。因此,我必须首先选择这三列,然后保存一个临时表(视图),然后在新数据帧中选择所需的列。我觉得这种方法太长太重了。
你能帮我解决这个问题吗???谢谢!
您的解决方案几乎是正确的,您只需要将 where
语句移动到select(..).distinct
之前:
val cities = spark.read
.options(Map("header" -> "true", "delimiter" -> ";"))
.csv("test.csv")
.where($"id".isin("10", "20") and $"year".isin("2017", "2018"))
.select("city").distinct
Spark scala API 比声明式(与 SQL 不同)更势在必行,这就是为什么在你select("city")
之后你丢失了数据帧中的所有其他字段。 以及为什么,正如其他人指出的那样,您应该在进行选择之前过滤/在哪里。这有点令人困惑,因为Scala DSL在语法上类似于SQL
正如sramalingam24和Raphael Roth所提到的,在从DataFrame中选择必填字段之前,必须应用where
。过滤器,两者给出相同的结果,如下所示。dropDuplicates() 将删除 city 列中的 Duplicates。
val cities = spark.read.options(Map("header" -> "true", "delimiter" -> ";"))
.csv("test.csv")
.filter($"id".isin("10", "20") and $"year".isin("2017", "2018"))
.select("city")
.dropDuplicates()