R-有效计算宽火花DF的行总计



我有几千列的宽火花数据框,大约一百万行,我想为此计算行总数。到目前为止,我的解决方案在下面。我用了:dplyr-使用正则表达式的多个列的总和和https://github.com/tidyverse/rlang/issues/116

library(sparklyr)
library(DBI)
library(dplyr)
library(rlang)
sc1 <- spark_connect(master = "local")
wide_df = as.data.frame(matrix(ceiling(runif(2000, 0, 20)), 10, 200))
wide_sdf = sdf_copy_to(sc1, wide_df, overwrite = TRUE, name = "wide_sdf")
col_eqn = paste0(colnames(wide_df), collapse = "+" )
# build up the SQL query and send to spark with DBI
query = paste0("SELECT (",
               col_eqn,
               ") as total FROM wide_sdf")
dbGetQuery(sc1, query)
# Equivalent approach using dplyr instead
col_eqn2 = quo(!! parse_expr(col_eqn))
wide_sdf %>% 
    transmute("total" := !!col_eqn2) %>%
        collect() %>%
            as.data.frame()

当列数增加时,问题就会出现。在Spark SQL上,它似乎是一次计算的一个元素,即(((((V1 V1) V3) V4)...)这导致由于很高的递归而导致错误。

有人有一种替代性更有效的方法吗?任何帮助将不胜感激。

您在这里不幸。一种或另一种方式,您将要达到一些递归限制(即使您绕过SQL解析器,足够大的表达式也会崩溃查询计划者)。有一些缓慢的解决方案:

  • 使用 spark_apply(以t和r的转换为代价):

    wide_sdf %>% spark_apply(function(df) { data.frame(total = rowSums(df)) })
    
  • 转换为长格式和汇总(以explode和Shuffle为代价):

    key_expr <- "monotonically_increasing_id() AS key"
    value_expr <- paste(
     "explode(array(", paste(colnames(wide_sdf), collapse=","), ")) AS value"
    )
    wide_sdf %>% 
      spark_dataframe() %>% 
      # Add id and explode. We need a separate invoke so id is applied
      # before "lateral view"
      sparklyr::invoke("selectExpr", list(key_expr, "*")) %>% 
      sparklyr::invoke("selectExpr", list("key", value_expr)) %>% 
      sdf_register() %>% 
      # Aggregate by id
      group_by(key) %>% 
      summarize(total = sum(value)) %>% 
      arrange(key)
    

要获得更有效的东西,您应该考虑将Scala扩展名和直接应用于Row对象,而无需爆炸:

package com.example.sparklyr.rowsum
import org.apache.spark.sql.{DataFrame, Encoders}
object RowSum {
  def apply(df: DataFrame, cols: Seq[String]) = df.map {
    row => cols.map(c => row.getAs[Double](c)).sum
  }(Encoders.scalaDouble)
}

invoke_static(
  sc, "com.example.sparklyr.rowsum.RowSum", "apply",
  wide_sdf %>% spark_dataframe
) %>% sdf_register()

相关内容

  • 没有找到相关文章

最新更新