我正在使用PySpark,我想要一个简单的方法来做下一个过程,而不是过于复杂。
假设我有一个这样的表:
您可以尝试这样做:
import pyspark.sql.functions as F
data1 = [
[1, "a,b,c,d"],
[2, "b,d,b"],
[3, "c,y,u"],
]
df = spark.createDataFrame(data1).toDF("ID", "Letters")
dfWithDistinctValues = df.select(
F.array_distinct(
F.flatten(F.collect_set(F.array_distinct(F.split(df.Letters, ","))))
).alias("unique_letters")
)
defaultValues = [
data[0] for data in dfWithDistinctValues.select("unique_letters").collect()
]
print(defaultValues)
这里发生了什么:
首先我用","使用f.c re_distinct
删除行级重复项我使用collect_set将所有不同的数组放到一行中在这个阶段是数组的数组它看起来像这样:
[[b, d], [a, b, c, d], [c, y, u]]
然后我使用flatten将所有值作为单独的字符串:
[b, d, a, b, c, d, c, y, u]
仍然有一些重复的被array_distinct删除,所以在最后输出看起来像这样:
[b, d, a, c, y, u]
如果你需要同样的计数,你可以像Koedit提到的那样使用explosion,你可以把他的部分代码修改成这样:
# Unique letters with counts
uniqueLettersDf = (
df.select(explode(array_distinct("Letters")).alias("Letter"))
.groupBy("Letter")
.count()
.show()
)
现在你将得到如下内容:
+------+-----+
|Letter|count|
+------+-----+
| d| 2|
| c| 2|
| b| 2|
| a| 1|
| y| 1|
| u| 1|
+------+-----+
根据你的数据集和数组的大小(如果它们非常大,这可能不是你想要的路线),你可以使用explode
函数来轻松地获得你想要的:
from pyspark.sql.functions import explode
df = spark.createDataFrame(
[
(1, ["a", "b", "c", "d"]),
(2, ["b", "d", "b"]),
(3, ["c", "y", "u"])
],
["ID", "Letters"]
)
# Creating a dataframe with 1 column, "letters", with distinct values per row
uniqueLettersDf = df.select(explode("Letters").alias("letters")).distinct()
# Using list comprehension and the .collect() method to turn our dataframe into a Python list
output = [row['letters'] for row in uniqueLettersDf.collect()]
output
['d', 'c', 'b', 'a', 'y', 'u']
编辑:为了使它更安全,我们可以在使用explode
之前使用array_distinct
:这将限制在爆炸之前通过移除双精度数而产生的行数。
代码将是相同的,除了这些行:
from pyspark.sql.functions import explode, array_distinct
...
uniqueLettersDf = df.select(explode(array_distinct("Letters")).alias("letters")).distinct()
...