在 scala 中没有列表的情况下继续的替代方法



我有这样的scala代码

def avgCalc(buffer: Iterable[Array[String]], list: Array[String]) = {
val currentTimeStamp = list(1).toLong // loads the timestamp column
var sum = 0.0
var count = 0
var check = false
import scala.util.control.Breaks._
breakable {
for (array <- buffer) {
val toCheckTimeStamp = array(1).toLong // timestamp column
if (((currentTimeStamp - 10L) <= toCheckTimeStamp) && (currentTimeStamp >= toCheckTimeStamp)) { // to check the timestamp for 10 seconds difference
sum += array(5).toDouble // RSSI weightage values
count += 1
}
if ((currentTimeStamp - 10L) > toCheckTimeStamp) {
check = true
break
}
}
}
list :+ sum
}

我将像这样调用上面的函数

import spark.implicits._
val averageDF =
filterop.rdd.map(_.mkString(",")).map(line => line.split(",").map(_.trim))
.sortBy(array => array(1), false) // Sort by timestamp
.groupBy(array => (array(0), array(2))) // group by tag and listner
.mapValues(buffer => {
buffer.map(list => {
avgCalc(buffer, list) // calling the average function 
})
})
.flatMap(x => x._2)
.map(x => findingavg(x(0).toString, x(1).toString.toLong, x(2).toString, x(3).toString, x(4).toString, x(5).toString.toDouble, x(6).toString.toDouble)) // defining the schema through case class
.toDF // converting to data frame

上面的代码工作正常。但我需要摆脱列表。我的前辈让我删除列表,因为列表降低了执行速度。任何建议在没有列表的情况下进行? 任何帮助将不胜感激。

我想以下解决方案应该有效,我试图避免同时传递可迭代数组和一个数组。

def avgCalc(buffer: Iterable[Array[String]]) = {
var finalArray = Array.empty[Array[String]]
import scala.util.control.Breaks._
breakable {
for (outerArray <- buffer) {
val currentTimeStamp = outerArray(1).toLong
var sum = 0.0
var count = 0
var check = false
var list = outerArray
for (array <- buffer) {
val toCheckTimeStamp = array(1).toLong
if (((currentTimeStamp - 10L) <= toCheckTimeStamp) && (currentTimeStamp >= toCheckTimeStamp)) {
sum += array(5).toDouble
count += 1
}
if ((currentTimeStamp - 10L) > toCheckTimeStamp) {
check = true
break
}
}
if (sum != 0.0 && check) list = list :+ (sum / count).toString
else list = list :+ list(5).toDouble.toString
finalArray ++= Array(list)
}
}
finalArray
}

你可以这样称呼它

import sqlContext.implicits._
val averageDF =
filter_op.rdd.map(_.mkString(",")).map(line => line.split(",").map(_.trim))
.sortBy(array => array(1), false)
.groupBy(array => (array(0), array(2)))
.mapValues(buffer => {
avgCalc(buffer)
})
.flatMap(x => x._2)
.map(x => findingavg(x(0).toString, x(1).toString.toLong, x(2).toString, x(3).toString, x(4).toString, x(5).toString.toDouble, x(6).toString.toDouble))
.toDF

我希望这是想要的答案

我可以看到你已经接受了答案,但我不得不说你有很多不必要的代码。据我所知,您没有理由首先将初始转换为Array类型,此时也不需要sortBy。我建议你直接在Row上工作.

此外,您还有许多未使用的变量应该删除,并且转换为案例类后跟一个toDF恕我直言,这似乎太多了。

我会做这样的事情:

import org.apache.spark.sql.Row
def avgCalc(sortedList: List[Row]) = {
sortedList.indices.map(i =>  {
var sum = 0.0
val row = sortedList(i)
val currentTimeStamp = row.getString(1).toLong // loads the timestamp column
import scala.util.control.Breaks._
breakable {
for (j <- 0 until sortedList.length) {
if (j != i) {
val anotherRow = sortedList(j)
val toCheckTimeStamp = anotherRow.getString(1).toLong // timestamp column
if (((currentTimeStamp - 10L) <= toCheckTimeStamp) && (currentTimeStamp >= toCheckTimeStamp)) { // to check the timestamp for 10 seconds difference
sum += anotherRow.getString(5).toDouble // RSSI weightage values
}
if ((currentTimeStamp - 10L) > toCheckTimeStamp) {
break
}
}
}
}
(row.getString(0), row.getString(1), row.getString(2), row.getString(3), row.getString(4), row.getString(5), sum.toString)
})
}

val averageDF = filterop.rdd
.groupBy(row => (row(0), row(2)))
.flatMap{case(_,buffer) => avgCalc(buffer.toList.sortBy(_.getString(1).toLong))}
.toDF("Tag", "Timestamp", "Listner", "X", "Y", "RSSI", "AvgCalc")

作为最后的评论,我很确定有可能想出更好/更干净的avgCalc函数实现,但我会把它留

给你玩这个:)

相关内容

最新更新