使用 scala 计算分配给每个元素的最大长度



例如,这是文件中的内容:

20,1,helloworld,alaaa
2,3,world,neww
1,223,ala,12341234

期望的输出">

0-> 2
1-> 3
2-> 10
3-> 8

我想找到分配给每个元素的最大长度。

可以将其扩展到任意数量的列。首先将文件作为数据帧读取:

val df = spark.read.csv("path")

然后为每列创建一个 SQL 表达式,并使用expr对其进行计算:

val cols = df.columns.map(c => s"max(length(cast($c as String)))").map(expr(_))

选择新列作为数组并隐蔽Map

df.select(array(cols:_*)).as[Seq[Int]].collect()
.head
.zipWithIndex.map(_.swap)
.toMap

这应该给你所需的Map.

Map(0 -> 2, 1 -> 3, 2 -> 10, 3 -> 8)

更新:

  1. OP的例子表明它们将具有相等的长度。

  2. 在DF列上使用Spark-SQL和max(length(((是这个答案中提出的想法。

你可以做:

val xx = Seq(
("20","1","helloworld","alaaa"),
("2","3","world","neww"),
("1","223","ala","12341234")
).toDF("a", "b", "c", "d")
xx.registerTempTable("yy")
spark.sql("select max(length(a)), max(length(b)), max(length(c)), max(length(d)) from yy")

我建议使用RDD的聚合方法:

val rdd = sc.textFile("/path/to/textfile").
map(_.split(","))
// res1: Array[Array[String]] = Array(
//   Array(20, 1, helloworld, alaaa), Array(2, 3, world, neww), Array(1, 223, ala, 12341234)
// )
val seqOp = (m: Array[Int], r: Array[String]) =>
(r zip m).map( t => Seq(t._1.length, t._2).max )
val combOp = (m1: Array[Int], m2: Array[Int]) =>
(m1 zip m2).map( t => Seq(t._1, t._2).max )
val size = rdd.collect.head.size
rdd.
aggregate( Array.fill[Int](size)(0) )( seqOp, combOp ).
zipWithIndex.map(_.swap).
toMap
// res2: scala.collection.immutable.Map[Int,Int] = Map(0 -> 2, 1 -> 3, 2 -> 10, 3 -> 8)

请注意,aggregate需要:

  1. 一个 0 的数组(大小等于 RDD 的行大小(作为初始值,
  2. seqOp用于计算分区内最大字符串长度的函数,以及
  3. 另一个函数combOp跨分区合并结果以获得最终最大值。

最新更新