我有一个RDD[Map[String,Int]],其中映射的键是列名。每个映射都是不完整的,为了知道列名,我需要联合所有键。有没有一种方法可以避免这种收集操作,即知道所有密钥并只使用一次rdd.saveAsTextFile(..)来获取csv?
例如,假设我有一个包含两个元素的RDD(标量表示法):
Map("a"->1, "b"->2)
Map("b"->1, "c"->3)
我想最终得到这个csv:
a,b,c
1,2,0
0,1,3
Scala解决方案更好,但任何其他兼容Spark的语言都可以
编辑:
我也可以尝试从另一个方向解决我的问题。假设我一开始就知道所有的列,但我想去掉所有映射中值为0的列。所以问题变成了,我知道密钥是("a","b","c"),由此:
Map("a"->1, "b"->2, "c"->0)
Map("a"->3, "b"->1, "c"->0)
我需要写csv:
a,b
1,2
3,1
只需要一次收款就可以做到这一点吗?
如果您的语句是:"我的RDD中的每个新元素都可能添加一个我迄今为止从未见过的新列名",那么答案显然是无法避免全面扫描。但您不需要收集驱动程序上的所有元素。
可以使用aggregate
仅收集列名。这个方法有两个函数,一个是将单个元素插入到生成的集合中,另一个是合并来自两个不同分区的结果。
rdd.aggregate(Set.empty[String])( {(s, m) => s union m.keySet }, { (s1, s2) => s1 union s2 })
您将返回RDD中所有列名的集合。在第二次扫描中,您可以打印CSV文件。
Scala和任何其他受支持的语言
您可以使用spark-csv
首先让我们查找所有现有列:
val cols = sc.broadcast(rdd.flatMap(_.keys).distinct().collect())
创建RDD[行]:
val rows = rdd.map {
row => { Row.fromSeq(cols.value.map { row.getOrElse(_, 0) })}
}
准备架构:
import org.apache.spark.sql.types.{StructType, StructField, IntegerType}
val schema = StructType(
cols.value.map(field => StructField(field, IntegerType, true)))
将RDD[行]转换为数据帧:
val df = sqlContext.createDataFrame(rows, schema)
写入结果:
// Spark 1.4+, for other versions see spark-csv docs
df.write.format("com.databricks.spark.csv").save("mycsv.csv")
您可以使用其他支持的语言来做几乎相同的事情。
Python
如果您使用Python,并且最终数据适合驱动程序内存,则可以通过toPandas()
方法使用Pandas:
rdd = sc.parallelize([{'a': 1, 'b': 2}, {'b': 1, 'c': 3}])
cols = sc.broadcast(rdd.flatMap(lambda row: row.keys()).distinct().collect())
df = sqlContext.createDataFrame(
rdd.map(lambda row: {k: row.get(k, 0) for k in cols.value}))
df.toPandas().save('mycsv.csv')
或直接:
import pandas as pd
pd.DataFrame(rdd.collect()).fillna(0).save('mycsv.csv')
编辑
第二个collect
的一种可能方法是使用累加器来构建一组所有列名,或者在找到零的地方对这些列名进行计数,并使用这些信息来映射行,删除不必要的列或添加零。
这是可能的,但效率低下,感觉像作弊。唯一有意义的情况是零的数量非常少,但我想这里不是这样。
object ColsSetParam extends AccumulatorParam[Set[String]] {
def zero(initialValue: Set[String]): Set[String] = {
Set.empty[String]
}
def addInPlace(s1: Set[String], s2: Set[String]): Set[String] = {
s1 ++ s2
}
}
val colSetAccum = sc.accumulator(Set.empty[String])(ColsSetParam)
rdd.foreach { colSetAccum += _.keys.toSet }
或
// We assume you know this upfront
val allColnames = sc.broadcast(Set("a", "b", "c"))
object ZeroColsParam extends AccumulatorParam[Map[String, Int]] {
def zero(initialValue: Map[String, Int]): Map[String, Int] = {
Map.empty[String, Int]
}
def addInPlace(m1: Map[String, Int], m2: Map[String, Int]): Map[String, Int] = {
val keys = m1.keys ++ m2.keys
keys.map(
(k: String) => (k -> (m1.getOrElse(k, 0) + m2.getOrElse(k, 0)))).toMap
}
}
val accum = sc.accumulator(Map.empty[String, Int])(ZeroColsParam)
rdd.foreach { row =>
// If allColnames.value -- row.keys.toSet is empty we can avoid this part
accum += (allColnames.value -- row.keys.toSet).map(x => (x -> 1)).toMap
}