如何使用 Apache Spark/PySpark 获取 3 个最小的唯一行大 csv(>1000 万行)文件?



我是来自波兰的博士生。我有一个关于Apache Spark/Pyspark 2的问题。如何使用 Apache Spark/PySpark 2 获得 3 行最小的唯一(唯一文本,而不是长度)的大 csv 文件(>1000 万行)?

数据.csvCSV 文件示例:

name,id
abc,1
abcd,2
abcde,3
ab,4
ab,4

1 获取数据框中每个唯一行的长度列表:

from pyspark import SparkConf, SparkContext
from pyspark.sql import SQLContext
conf = SparkConf().setMaster("local[*]")
sc = SparkContext(conf=conf)
sql_context = SQLContext(sc)
df = sql_context.read.csv(
path="/home/rkorniichuk/data.csv", sep=',', encoding="UTF-8",
quote='"', escape='"', header=True, inferSchema=True,
ignoreLeadingWhiteSpace=True, ignoreTrailingWhiteSpace=False,
mode="FAILFAST")
def get_row_lenght(row):
lenght = 0
for column in row:
lenght += len(str(column))
return lenght`
rows_lenght_list = [df.foreach(get_row_lenght)]`
>>> rows_length_list
>>> [None]

我们这里有一个问题,因为我想rows_length_list值填充为[4, 5, 6, 3, 3].

2 排序rows_length_list

rows_length_list.sort()
>>> rows_length_list
>>> [3, 4, 5, 6]

3 获取 csv 文件示例行的最大值:

>>> rows_length_list[3-1]
>>> 5

4 获取长度为 <=5 个字符的 3 个样本:

abc,1 # TRUE
abcd,2 # TRUE
abcde,3 # FALSE
ab,4 # TRUE and BREAK
ab,4

我只能通过数据帧(没有 SQL 请求)来实现吗?

您可以使用concat()将所有列连接成一个字符串,括在length()内以计算生成的新变量的长度:

from pyspark.sql.functions import concat, length, col
df.withColumn("row_len", length(concat(*df.columns))) 
.filter(col("row_len") <= 5) 
.dropDuplicates() 
.sort("row_len") 
.show()
+----+---+-------+
|name| id|row_len|
+----+---+-------+
|  ab|  4|      3|
| abc|  1|      4|
|abcd|  2|      5|
+----+---+-------+

如果您有超过3行,您可以使用.take(3)to 而不是.show()来获取具有最小row_len的 3 个唯一行。

最新更新