我有几千列的宽火花数据框,大约一百万行,我想为此计算行总数。到目前为止,我的解决方案在下面。我用了:dplyr-使用正则表达式的多个列的总和和https://github.com/tidyverse/rlang/issues/116
library(sparklyr)
library(DBI)
library(dplyr)
library(rlang)
sc1 <- spark_connect(master = "local")
wide_df = as.data.frame(matrix(ceiling(runif(2000, 0, 20)), 10, 200))
wide_sdf = sdf_copy_to(sc1, wide_df, overwrite = TRUE, name = "wide_sdf")
col_eqn = paste0(colnames(wide_df), collapse = "+" )
# build up the SQL query and send to spark with DBI
query = paste0("SELECT (",
col_eqn,
") as total FROM wide_sdf")
dbGetQuery(sc1, query)
# Equivalent approach using dplyr instead
col_eqn2 = quo(!! parse_expr(col_eqn))
wide_sdf %>%
transmute("total" := !!col_eqn2) %>%
collect() %>%
as.data.frame()
当列数增加时,问题就会出现。在Spark SQL上,它似乎是一次计算的一个元素,即(((((V1 V1) V3) V4)...)这导致由于很高的递归而导致错误。
有人有一种替代性更有效的方法吗?任何帮助将不胜感激。
您在这里不幸。一种或另一种方式,您将要达到一些递归限制(即使您绕过SQL解析器,足够大的表达式也会崩溃查询计划者)。有一些缓慢的解决方案:
-
使用
spark_apply
(以t和r的转换为代价):wide_sdf %>% spark_apply(function(df) { data.frame(total = rowSums(df)) })
-
转换为长格式和汇总(以
explode
和Shuffle为代价):key_expr <- "monotonically_increasing_id() AS key" value_expr <- paste( "explode(array(", paste(colnames(wide_sdf), collapse=","), ")) AS value" ) wide_sdf %>% spark_dataframe() %>% # Add id and explode. We need a separate invoke so id is applied # before "lateral view" sparklyr::invoke("selectExpr", list(key_expr, "*")) %>% sparklyr::invoke("selectExpr", list("key", value_expr)) %>% sdf_register() %>% # Aggregate by id group_by(key) %>% summarize(total = sum(value)) %>% arrange(key)
要获得更有效的东西,您应该考虑将Scala扩展名和直接应用于Row
对象,而无需爆炸:
package com.example.sparklyr.rowsum
import org.apache.spark.sql.{DataFrame, Encoders}
object RowSum {
def apply(df: DataFrame, cols: Seq[String]) = df.map {
row => cols.map(c => row.getAs[Double](c)).sum
}(Encoders.scalaDouble)
}
和
invoke_static(
sc, "com.example.sparklyr.rowsum.RowSum", "apply",
wide_sdf %>% spark_dataframe
) %>% sdf_register()