为什么Apache Spark分区CSV根据文件大小读取,以及如何更改分区



这是我的pyspark代码:

csv_file = "/FileStore/tables/mnt/training/departuredelays02.csv"
schema   = "`date` STRING, `delay` INT, `distance` INT, `origin` STRING, `destination` STRING"
df = (spark
.read
.format("csv")                    
.option("header","true")
.schema(schema)
.load(csv_file)                  
)
partitions = df.rdd.getNumPartitions()
print(partitions)

csv文件有487178行。

打印分区后,得到的结果是3个分区。

请注意,我有两个4芯的工人。这意味着总共有8个插槽。

现在,如果我尝试加载以下文件,该文件要大得多,有1391578行:

csv_file = "/FileStore/tables/mnt/training/departuredelays.csv"

我得到一个8的分区。

我的问题是如何强制第一个CSV以与较大文件相同的方式进行分区。我知道可以使用重新分区,但我很好奇,这是否可以在没有任何洗牌的情况下完成?即使我们重新划分它,它似乎也会创建一个包含3个任务而不是8个任务的作业。

以下是我运行以下代码片段后得到的内容:

df = df.repartition(8)
print(df.count())

第一个任务的第一阶段仍然分配有3个任务。

输出:

(3) Spark Jobs
Job 93 View(Stages: 1/1)
Stage 123: 3/3
Job 94 View(Stages: 1/1, 1 skipped)
Stage 124: 0/3 skipped
Stage 125: 8/8
Job 95 View(Stages: 1/1, 2 skipped)
Stage 126: 0/3 skipped
Stage 127: 0/8 skipped
Stage 128: 1/1

您可以尝试使用coalesce,它可以进行合理的洗牌,而不是重新分区。

df = spark
.read
.format("csv")                    
.option("header","true")
.schema(schema)
.load(csv_file)                  
.coalesce(8)

查看此以了解更多信息Spark-重新分区((与合并((

最新更新