如何组合和聚合数据帧行



我有一个数据帧,看起来有点像这样:

endPoint    power    time
device1      -4       0
device2       3       0
device3      -2       0
device4       0       0
device5       5       0
device6      -5       0
device1       4       1
device2      -3       1
device3       5       1
device4      -2       1
device5       1       1
device6       4       1
....
device1       6       x
device2      -5       x
device3       4       x
device4       3       x
device5      -1       x
device6       1       x

我想把它改成这样:

span               powerAboveThreshold    time
device1-device3        true                 0
device2-device6        true                 0
...
devicex-devicey        false                w

我想将行聚合成两个新列,并按时间和跨度进行分组。powerAboveThreshold的值取决于该范围内任一器件的power是否大于0,因此如果devicexdevicey小于0则为假。

作为旁注,有一个设备跨度包含4个设备-而其余的只包含2个设备。我需要在设计时考虑到这一点。

device1-device3-device6-device2

我使用Apache Spark DataFrame API/Spark SQL来完成这个。

编辑:

我可以将数据帧转换为RDD并以这种方式计算它吗?

edit2:

对Daniel L的后续问题:

就我目前所知,

似乎是一个很好的答案。我有几个问题:

  • RDD是否具有预期的结构,如果我从数据框转换它?
  • 这部分节目是怎么回事?.aggregateByKey(Result())((result, sample) => aggregateSample(result, sample), addResults)。我看到它对每个键值对(结果,示例)运行aggregateSample(),但是addResults调用是如何工作的?是否会在与键相关的每个项上调用它以将aggregateSample生成的每个连续结果添加到前面的结果中?我不太明白。
  • .map(_._2)在做什么?
  • 在什么情况下result.spanaggregateSample函数中为空?
  • 在什么情况下res1.spanaddResults函数中为空?

对于所有的问题,我很抱歉,但是我是函数式编程、Scala和Spark的新手,所以我有很多东西要记住!

我不确定你能做的文本连接,因为你想在DataFrames(也许你可以),但在一个正常的RDD你可以这样做:

val rdd = sc.makeRDD(Seq(
  ("device1", -4, 0),
  ("device2", 3, 0),
  ("device3", -2, 0),
  ("device4", 0, 0),
  ("device5", 5, 0),
  ("device6", -5, 0),
  ("device1", 4, 1),
  ("device2", -3, 1),
  ("device3", 5, 1),
  ("device4", 1, 1),
  ("device5", 1, 1),
  ("device6", 4, 1)))
val spanMap = Map(
"device1" -> 1,
"device2" -> 1,
"device3" -> 1,
"device4" -> 2,
"device5" -> 2,
"device6" -> 1
)
case class Result(var span: String = "", var aboveThreshold: Boolean = true, var time: Int = -1)
def aggregateSample(result: Result, sample: (String, Int, Int)) = {
  result.time = sample._3
  result.aboveThreshold = result.aboveThreshold && (sample._2 > 0)
  if(result.span.isEmpty)
    result.span += sample._1
  else
    result.span += "-" + sample._1
  result
}
def addResults(res1: Result, res2: Result) = {
  res1.aboveThreshold = res1.aboveThreshold && res2.aboveThreshold
  if(res1.span.isEmpty)
    res1.span += res2.span
  else
    res1.span += "-" + res2.span
  res1
}
val results = rdd
  .map(x => (x._3, spanMap.getOrElse(x._1, 0)) -> x)  // Create a key to agregate with, by time and span
  .aggregateByKey(Result())((result, sample) => aggregateSample(result, sample), addResults)
  .map(_._2)
results.collect().foreach(println(_))

它打印了这个,这是我理解你需要的:

Result(device4-device5,false,0)
Result(device4-device5,true,1)
Result(device1-device2-device3-device6,false,0)
Result(device1-device2-device3-device6,false,1)

这里我使用一个映射来告诉我哪些设备会在一起(对于你的配对和4-device例外),你可能想用其他函数替换它,将其硬编码为静态函数以避免序列化或使用广播变量。

=================== 编辑 ==========================

就我目前所知,这似乎是一个很好的答案。

请随意投票/接受它,帮助我和其他人寻找问题的答案:-)

如果我从数据框架转换RDD,它会有预期的结构吗?

是的,主要区别在于DataFrame包含一个模式,因此它可以更好地优化底层调用,直接使用该模式或映射到我用作示例的元组应该是微不足道的,我这样做主要是为了方便。Hernan刚刚发布了另一个答案,显示了其中的一些内容(并且为了方便起见,还复制了我使用的初始测试数据),所以我不会重复那部分内容,但是正如他提到的那样,您的设备跨度分组和表示是棘手的,因此我更喜欢在RDD上采用更强制的方式。

节目的这一部分发生了什么?. aggregatebykey (Result())((Result, sample) => aggregateSample(Result, sample), addressults)。我看到它对每个键值对(结果、样本)运行aggregateSample(),但是addressults调用是如何工作的呢?是否在与键相关的每个项上调用它来将aggregateSample生成的每个连续结果添加到前面的结果中?我不完全明白。

aggregateByKey是一个非常优的函数。为了避免将所有数据从一个节点转移到另一个节点,然后再进行合并,它首先在本地对每个键的单个结果进行样本聚合(第一个函数)。然后它将这些结果进行洗牌并将其相加(第二个函数)。

.map(_._2)在做什么?

聚合后简单地从键/值RDD中丢弃键,您不再关心它,所以我只保留结果。

在什么情况下会导致。跨度是空的aggregateSample函数?在什么情况下会发生?跨度是空的地址结果函数?

在进行聚合时,需要提供一个"零"值。例如,如果要聚合数字,Spark会执行(0 + firstValue) + secondValue…等。if子句防止在第一个设备名称之前添加虚假的'-',因为我们将它放在设备之间。这与在项目列表中添加一个额外的逗号等处理没有什么不同。查看aggregateByKey的文档和示例,它将对您有很大帮助。

这是数据帧的实现(没有连接的名称):

val data = Seq(
  ("device1", -4, 0),
  ("device2", 3, 0),
  ("device3", -2, 0),
  ("device4", 0, 0),
  ("device5", 5, 0),
  ("device6", -5, 0),
  ("device1", 4, 1),
  ("device2", -3, 1),
  ("device3", 5, 1),
  ("device4", 1, 1),
  ("device5", 1, 1),
  ("device6", 4, 1)).toDF("endPoint", "power", "time")
val mapping = Seq(
  "device1" -> 1,
  "device2" -> 1,
  "device3" -> 1,
  "device4" -> 2,
  "device5" -> 2,
  "device6" -> 1).toDF("endPoint", "span")
data.as("A").
  join(mapping.as("B"), $"B.endpoint" === $"A.endpoint", "inner").
  groupBy($"B.span", $"A.time").
  agg(min($"A.power" > 0).as("powerAboveThreshold")).
  show()

连接名称相当困难,这需要您编写自己的UDAF(在Spark的下一个版本中支持),或者使用Hive函数的组合。

相关内容

  • 没有找到相关文章

最新更新