Spark RDD to CSV-添加空列



我有一个RDD[Map[String,Int]],其中映射的键是列名。每个映射都是不完整的,为了知道列名,我需要联合所有键。有没有一种方法可以避免这种收集操作,即知道所有密钥并只使用一次rdd.saveAsTextFile(..)来获取csv?

例如,假设我有一个包含两个元素的RDD(标量表示法):

Map("a"->1, "b"->2)
Map("b"->1, "c"->3)

我想最终得到这个csv:

a,b,c
1,2,0
0,1,3

Scala解决方案更好,但任何其他兼容Spark的语言都可以

编辑:

我也可以尝试从另一个方向解决我的问题。假设我一开始就知道所有的列,但我想去掉所有映射中值为0的列。所以问题变成了,我知道密钥是("a","b","c"),由此:

Map("a"->1, "b"->2, "c"->0)
Map("a"->3, "b"->1, "c"->0)

我需要写csv:

a,b
1,2
3,1

只需要一次收款就可以做到这一点吗?

如果您的语句是:"我的RDD中的每个新元素都可能添加一个我迄今为止从未见过的新列名",那么答案显然是无法避免全面扫描。但您不需要收集驱动程序上的所有元素。

可以使用aggregate仅收集列名。这个方法有两个函数,一个是将单个元素插入到生成的集合中,另一个是合并来自两个不同分区的结果。

rdd.aggregate(Set.empty[String])( {(s, m) => s union m.keySet }, { (s1, s2) => s1 union s2 })

您将返回RDD中所有列名的集合。在第二次扫描中,您可以打印CSV文件。

Scala和任何其他受支持的语言

您可以使用spark-csv

首先让我们查找所有现有列:

val cols = sc.broadcast(rdd.flatMap(_.keys).distinct().collect())

创建RDD[行]:

val rows = rdd.map {
    row => { Row.fromSeq(cols.value.map { row.getOrElse(_, 0) })}
}

准备架构:

import org.apache.spark.sql.types.{StructType, StructField, IntegerType}
val schema = StructType(
    cols.value.map(field => StructField(field, IntegerType, true)))

将RDD[行]转换为数据帧:

val df = sqlContext.createDataFrame(rows, schema)

写入结果:

// Spark 1.4+, for other versions see spark-csv docs
df.write.format("com.databricks.spark.csv").save("mycsv.csv")

您可以使用其他支持的语言来做几乎相同的事情。

Python

如果您使用Python,并且最终数据适合驱动程序内存,则可以通过toPandas()方法使用Pandas:

rdd = sc.parallelize([{'a': 1, 'b': 2}, {'b': 1, 'c': 3}])
cols = sc.broadcast(rdd.flatMap(lambda row: row.keys()).distinct().collect())
df = sqlContext.createDataFrame(
    rdd.map(lambda row: {k: row.get(k, 0) for k in cols.value}))
df.toPandas().save('mycsv.csv')

或直接:

import pandas as pd 
pd.DataFrame(rdd.collect()).fillna(0).save('mycsv.csv')

编辑

第二个collect的一种可能方法是使用累加器来构建一组所有列名,或者在找到零的地方对这些列名进行计数,并使用这些信息来映射行,删除不必要的列或添加零。

这是可能的,但效率低下,感觉像作弊。唯一有意义的情况是零的数量非常少,但我想这里不是这样。

object ColsSetParam extends AccumulatorParam[Set[String]] {
  def zero(initialValue: Set[String]): Set[String] = {
    Set.empty[String]
  }
  def addInPlace(s1: Set[String], s2: Set[String]): Set[String] = {
    s1 ++ s2
  }
}
val colSetAccum = sc.accumulator(Set.empty[String])(ColsSetParam)
rdd.foreach { colSetAccum += _.keys.toSet } 

// We assume you know this upfront
val allColnames = sc.broadcast(Set("a", "b", "c"))
object ZeroColsParam extends AccumulatorParam[Map[String, Int]] {
  def zero(initialValue: Map[String, Int]): Map[String, Int] = {
    Map.empty[String, Int]
  }
  def addInPlace(m1: Map[String, Int], m2: Map[String, Int]): Map[String, Int] = {
    val keys = m1.keys ++ m2.keys
    keys.map(
      (k: String) => (k -> (m1.getOrElse(k, 0) + m2.getOrElse(k, 0)))).toMap
  }
}
val accum = sc.accumulator(Map.empty[String, Int])(ZeroColsParam)
rdd.foreach { row =>
  // If allColnames.value -- row.keys.toSet is empty we can avoid this part
  accum += (allColnames.value -- row.keys.toSet).map(x => (x -> 1)).toMap
}

最新更新