根据在不同列中测试的条件创建具有组标签的新列



我想根据在另一列中测试的条件创建组标签。特别是,如果有directionChange.equalTo(1)的值,我想开始一个新的段(标签(。应该的结果在列中给出segmentNr我的代码产生的结果是nSegment. 我认为不可能以这种方式进行类似的作业。 最后,我想计算不同的方面,例如每个段的值的总和、平均值、最大值(不在问题范围内(。

输入示例:

+---+-----+---------------+---------+--------+
| id|value|directionChange|segmentNr|nSegment|
+---+-----+---------------+---------+--------+
|  1| 11.0|              0|        1|       1|
|  2|-22.0|              1|        2|       1|
|  3| 34.0|              0|        2|       1|    
|  4|-47.0|              1|        3|       1|    
|  5| 61.0|              1|        4|       1|    
|  6| 22.0|              0|        4|       1|    
|  7|  5.0|              0|        4|       1|    
|  8| -7.0|              1|        5|       1|    
+---+-----+---------------+---------+--------+

使用输入数据集添加新列的函数:

public static Dataset<Row> createSegments(Dataset<Row> dataset, String columnName, int start, String newColumnName) throws Exception
{
int test = 1;
Dataset<Row> resultDataset = dataset.withColumn(newColumnName, //
functions.when(dataset.col(columnName).equalTo(1), (start = start + 1))//
.otherwise(start));
return resultDataset;
}

该函数的调用方式如下:

dataset = createSegments(dataset, "directionChange", 0, "nSegment");

这可以使用Window函数来完成。但是,由于您没有用于对数据进行分区的列,因此对于大型数据集来说,它可能会变得非常慢。这可以通过在下面的Window对象上使用partitionBy(column)来改进。但是,这需要一个好的列来划分,最终结果也会分成几个部分。

该解决方案背后的想法是按id列排序时,对directionChange列进行累积总和。在斯卡拉:

val window = Window.orderBy("id").rowsBetween(Window.unboundedPreceding, Window.currentRow)
val df2 = dataset.withColumn("nSegment", sum($"directionChange").over(window) + 1)

爪哇代码:

WindowSpec window = Window.orderBy("id").rowsBetween(Window.unboundedPreceding(), Window.currentRow()); 
Dataset<Row> df2 = dataset.withColumn("nSegment", functions.sum("directionChange").over(window));

在旧版本的 Spark (<2.1.0( 中,请使用:

rowsBetween(Long.MinValue, 0)

这将创建一个新的列nSegment,该列等于输入数据的segmentNr。下一步,您可以使用groupBy("nSegment").agg(...)计算每个细分的不同指标。

最新更新