Spark 版本 - 2.2.1.
我创建了一个包含 64 个存储桶的存储桶表,我正在执行一个聚合函数select t1.ifa,count(*) from $tblName t1 where t1.date_ = '2018-01-01' group by ifa
.我可以看到 Spark UI 中有 64 个任务,其中 20 个任务仅使用 4 个执行器(每个执行器有 16 个内核)。有没有办法横向扩展任务数,或者这就是存储桶查询的运行方式(正在运行的核心数作为存储桶数)?
下面是创建表:
sql("""CREATE TABLE level_1 (
bundle string,
date_ date,
hour SMALLINT)
USING ORC
PARTITIONED BY (date_ , hour )
CLUSTERED BY (ifa)
SORTED BY (ifa)
INTO 64 BUCKETS
LOCATION 'XXX'""")
下面是查询:
sql(s"select t1.ifa,count(*) from $tblName t1 where t1.date_ = '2018-01-01' group by ifa").show
使用存储桶时,任务数 == 存储桶数,因此您应该知道您需要/想要使用的核心/任务数,然后将其设置为桶数。
任务数=桶数可能是Spark中分桶最重要和讨论不足的方面。存储桶(默认情况下)过去仅用于创建可以优化大型联接的"预洗牌"数据帧。当您读取存储桶表时,每个存储桶的所有文件都由单个 Spark 执行器读取(读取数据时 30 个存储桶 = 30 个 Spark 任务),这将允许该表连接到在相同 # 列上存储的另一个表。我发现这种行为很烦人,并且像上面提到的用户一样,对于可能增长的表来说存在问题。
您现在可能会问自己,为什么以及何时我想存储,以及我的真实数据何时会随着时间的推移以完全相同的方式增长?(老实说,您可能按日期对大数据进行了分区)根据我的经验,您可能没有一个很好的用例来以默认的 spark 方式存储桶表。但一切都不会因为桶而丢失!
输入"桶修剪"。存储桶修剪仅在对一列进行存储桶时有效,但自 SparkSQL 和数据帧出现以来,它可能是您在 Spark 中最好的朋友。它允许 Spark 根据查询中的某个过滤器确定表中的哪些文件包含特定值,这可以大大减少 Spark 物理读取的文件数量,从而实现非常高效和快速的查询。(我将 2+hr 查询减少到 2 分钟和 Spark 工作人员的 1/100)。但是您可能不在乎,因为 # 个存储桶到任务的问题意味着,如果每个存储桶、每个分区的文件太多,您的表将永远不会"纵向扩展"。
输入 Spark 3.2.0。即将推出一项新功能,该功能将允许在禁用基于存储桶的读取时保持活动状态,从而允许您通过存储桶修剪/扫描分发 Spark 读取。我还有一个技巧可以用 Spark <3.2 做到这一点,如下所示。 (请注意,在 S3 上使用 vanilla Spark.Read 对文件的叶扫描会增加开销,但如果您的表很大,没关系,BC 您的存储桶优化表将是跨所有可用 Spark 工作线程的分布式读取,现在可扩展)
val table = "ex_db.ex_tbl"
val target_partition = "2021-01-01"
val bucket_target = "valuex"
val bucket_col = "bucket_col"
val partition_col = "date"
import org.apache.spark.sql.functions.{col, lit}
import org.apache.spark.sql.execution.FileSourceScanExec
import org.apache.spark.sql.execution.datasources.{FileScanRDD,FilePartition}
val df = spark.table(tablename).where((col(partition_col)===lit(target_partition)) && (col(bucket_col)===lit(bucket_target)))
val sparkplan = df.queryExecution.executedPlan
val scan = sparkplan.collectFirst { case exec: FileSourceScanExec => exec }.get
val rdd = scan.inputRDDs.head.asInstanceOf[FileScanRDD]
val bucket_files = for
{ FilePartition(bucketId, files) <- rdd.filePartitions f <- files }
yield s"$f".replaceAll("path: ", "").split(",")(0)
val format = bucket_files(0).split("
.").last
val result_df = spark.read.option("mergeSchema", "False").format(format).load(bucket_files:_*).where(col(bucket_col) === lit(bucket_target))