我有一个数据帧,看起来有点像这样:
endPoint power time
device1 -4 0
device2 3 0
device3 -2 0
device4 0 0
device5 5 0
device6 -5 0
device1 4 1
device2 -3 1
device3 5 1
device4 -2 1
device5 1 1
device6 4 1
....
device1 6 x
device2 -5 x
device3 4 x
device4 3 x
device5 -1 x
device6 1 x
我想把它改成这样:
span powerAboveThreshold time
device1-device3 true 0
device2-device6 true 0
...
devicex-devicey false w
我想将行聚合成两个新列,并按时间和跨度进行分组。powerAboveThreshold
的值取决于该范围内任一器件的power
是否大于0,因此如果devicex
或devicey
小于0则为假。
作为旁注,有一个设备跨度包含4个设备-而其余的只包含2个设备。我需要在设计时考虑到这一点。
device1-device3-device6-device2
我使用Apache Spark DataFrame API/Spark SQL来完成这个。
编辑:我可以将数据帧转换为RDD并以这种方式计算它吗?
edit2:
对Daniel L的后续问题:
就我目前所知,似乎是一个很好的答案。我有几个问题:
- RDD是否具有预期的结构,如果我从数据框转换它?
- 这部分节目是怎么回事?
.aggregateByKey(Result())((result, sample) => aggregateSample(result, sample), addResults)
。我看到它对每个键值对(结果,示例)运行aggregateSample()
,但是addResults
调用是如何工作的?是否会在与键相关的每个项上调用它以将aggregateSample
生成的每个连续结果添加到前面的结果中?我不太明白。 - 在什么情况下
result.span
在aggregateSample
函数中为空? - 在什么情况下
res1.span
在addResults
函数中为空?
.map(_._2)
在做什么?对于所有的问题,我很抱歉,但是我是函数式编程、Scala和Spark的新手,所以我有很多东西要记住!
我不确定你能做的文本连接,因为你想在DataFrames(也许你可以),但在一个正常的RDD你可以这样做:
val rdd = sc.makeRDD(Seq(
("device1", -4, 0),
("device2", 3, 0),
("device3", -2, 0),
("device4", 0, 0),
("device5", 5, 0),
("device6", -5, 0),
("device1", 4, 1),
("device2", -3, 1),
("device3", 5, 1),
("device4", 1, 1),
("device5", 1, 1),
("device6", 4, 1)))
val spanMap = Map(
"device1" -> 1,
"device2" -> 1,
"device3" -> 1,
"device4" -> 2,
"device5" -> 2,
"device6" -> 1
)
case class Result(var span: String = "", var aboveThreshold: Boolean = true, var time: Int = -1)
def aggregateSample(result: Result, sample: (String, Int, Int)) = {
result.time = sample._3
result.aboveThreshold = result.aboveThreshold && (sample._2 > 0)
if(result.span.isEmpty)
result.span += sample._1
else
result.span += "-" + sample._1
result
}
def addResults(res1: Result, res2: Result) = {
res1.aboveThreshold = res1.aboveThreshold && res2.aboveThreshold
if(res1.span.isEmpty)
res1.span += res2.span
else
res1.span += "-" + res2.span
res1
}
val results = rdd
.map(x => (x._3, spanMap.getOrElse(x._1, 0)) -> x) // Create a key to agregate with, by time and span
.aggregateByKey(Result())((result, sample) => aggregateSample(result, sample), addResults)
.map(_._2)
results.collect().foreach(println(_))
它打印了这个,这是我理解你需要的:
Result(device4-device5,false,0)
Result(device4-device5,true,1)
Result(device1-device2-device3-device6,false,0)
Result(device1-device2-device3-device6,false,1)
这里我使用一个映射来告诉我哪些设备会在一起(对于你的配对和4-device例外),你可能想用其他函数替换它,将其硬编码为静态函数以避免序列化或使用广播变量。
=================== 编辑 ==========================
就我目前所知,这似乎是一个很好的答案。
请随意投票/接受它,帮助我和其他人寻找问题的答案:-)
如果我从数据框架转换RDD,它会有预期的结构吗?
是的,主要区别在于DataFrame包含一个模式,因此它可以更好地优化底层调用,直接使用该模式或映射到我用作示例的元组应该是微不足道的,我这样做主要是为了方便。Hernan刚刚发布了另一个答案,显示了其中的一些内容(并且为了方便起见,还复制了我使用的初始测试数据),所以我不会重复那部分内容,但是正如他提到的那样,您的设备跨度分组和表示是棘手的,因此我更喜欢在RDD上采用更强制的方式。
节目的这一部分发生了什么?. aggregatebykey (Result())((Result, sample) => aggregateSample(Result, sample), addressults)。我看到它对每个键值对(结果、样本)运行aggregateSample(),但是addressults调用是如何工作的呢?是否在与键相关的每个项上调用它来将aggregateSample生成的每个连续结果添加到前面的结果中?我不完全明白。
aggregateByKey
是一个非常优的函数。为了避免将所有数据从一个节点转移到另一个节点,然后再进行合并,它首先在本地对每个键的单个结果进行样本聚合(第一个函数)。然后它将这些结果进行洗牌并将其相加(第二个函数)。
.map(_._2)在做什么?
聚合后简单地从键/值RDD中丢弃键,您不再关心它,所以我只保留结果。
在什么情况下会导致。跨度是空的aggregateSample函数?在什么情况下会发生?跨度是空的地址结果函数?
在进行聚合时,需要提供一个"零"值。例如,如果要聚合数字,Spark会执行(0 + firstValue) + secondValue…等。if子句防止在第一个设备名称之前添加虚假的'-',因为我们将它放在设备之间。这与在项目列表中添加一个额外的逗号等处理没有什么不同。查看aggregateByKey
的文档和示例,它将对您有很大帮助。
这是数据帧的实现(没有连接的名称):
val data = Seq(
("device1", -4, 0),
("device2", 3, 0),
("device3", -2, 0),
("device4", 0, 0),
("device5", 5, 0),
("device6", -5, 0),
("device1", 4, 1),
("device2", -3, 1),
("device3", 5, 1),
("device4", 1, 1),
("device5", 1, 1),
("device6", 4, 1)).toDF("endPoint", "power", "time")
val mapping = Seq(
"device1" -> 1,
"device2" -> 1,
"device3" -> 1,
"device4" -> 2,
"device5" -> 2,
"device6" -> 1).toDF("endPoint", "span")
data.as("A").
join(mapping.as("B"), $"B.endpoint" === $"A.endpoint", "inner").
groupBy($"B.span", $"A.time").
agg(min($"A.power" > 0).as("powerAboveThreshold")).
show()
连接名称相当困难,这需要您编写自己的UDAF(在Spark的下一个版本中支持),或者使用Hive函数的组合。