我有一个带有许多信号的数据框
我有运行代码,但是我的执行时间很长。仅需一堆一百个信号,大约需要13分钟。
这是我在开始的inputdataframe(示例):
+----------+-----+
|SignalName|Value|
+----------+-----+
| S1| V1|
| S2| V1|
| S1| V2|
| S2| V2|
| S3| V1|
| S1| V3|
| S1| V1|
+----------+-----+
然后我想过滤重复项
var reducedDF = inputDataFrame.select("SignalName","Value").dropDuplicates()
redueddf.show的OUPUT:
+----------+-----+
|SignalName|Value|
+----------+-----+
| S1| V1|
| S1| V2|
| S1| V3|
| S2| V1|
| S2| V2|
| S3| V1|
+----------+-----+
下一步是获取无重复的信号名称的rdd。而且我使用了ZipWithIndex(),因为以后我想读取RDD的每个值。我可以使用以下代码执行此操作:
var RDDOfSignalNames = reducedDF.select("SignalName").rdd.map(r => r(0).asInstanceOf[String])
RDDOfSignalNames = RDDOfSignalNames.distinct()
val RDDwithIndex = RDDOfSignalNames.zipWithIndex()
val indexKey = RDDwithIndex.map { case (k, v) => (v, k) }
现在,最后一步是为每个可能的每个可能的值列出一个类型列表[字符串]的列表,并将其添加到地图:
:var dataTmp: DataFrame = null
var signalname = Seq[String]("")
var map = scala.collection.mutable.Map[String, List[String]]()
for (i <- 0 to (RDDOfSignalNames.count()).toInt - 1) {
signalname = indexKey.lookup(i)
dataTmp = reducedDF.filter(data.col("Signalname").contains(signalname(0)))
map += (signalname(0) -> dataTmp.rdd.map(r => r(0).asInstanceOf[String]).collect().toList)
println(i+"/"+(RDDOfSignalNames.count().toInt - 1).toString())
}
最后,地图看起来像这样:
scala.collection.mutable.Map[String,List[String]] = Map(S1 -> List(V1, V2, V3), S3 -> List(V1), S2 -> List(V1, V2))
问题是线映射 = ...对于106个信号,大约需要13分钟!有更有效的方法吗?
首先,不建议在 scala 中使用var
。您应该始终尝试使用不变变量。因此更改以下行
var reducedDF = inputDataFrame.select("SignalName","Value").dropDuplicates()
to
val reducedDF = inputDataFrame.select("SignalName","Value").distinct()
是首选。
和,
您不需要经历此类复杂性即可获得所需的输出。您可以使所需的输出执行以下
import org.apache.spark.sql.functions.collect_list
reducedDF
.groupBy("SignalName")
.agg(collect_list($"Value").as("Value"))
.rdd
.map(row => (row(0).toString -> row(1).asInstanceOf[scala.collection.mutable.WrappedArray[String]].toList))
.collectAsMap()
在哪里,
reducedDF.groupBy("SignalName").agg(collect_list($"Value").as("Value"))
给您dataframe
为
+----------+------------+
|SignalName|Value |
+----------+------------+
|S3 |[V1] |
|S2 |[V2, V1] |
|S1 |[V1, V2, V3]|
+----------+------------+
其余代码.rdd.map(row => (row(0).toString -> row(1).asInstanceOf[scala.collection.mutable.WrappedArray[String]].toList)).collectAsMap()
只是将dataframe
转换为所需的输出Map
。
最终地图输出是
Map(S1 -> List(V1, V2, V3), S3 -> List(V1), S2 -> List(V2, V1))