如何删除没有顺序的Spark值



我需要从数据框中删除一些不在正确位置的值。

我有以下数据框架,例如:

+-----+-----+
|count|PHASE|
+-----+-----+
|    1|    3|
|    2|    3|
|    3|    6|
|    4|    6|
|    5|    8|
|    6|    4|
|    7|    4|
|    8|    4|
+-----+-----+

我需要从数据框架中删除6和8,因为一些规则:

phase === 3 and lastPhase.isNull
phase === 4 and lastPhase.isin(2, 3)
phase === 6 and lastPhase.isin(4, 5)
phase === 8 and lastPhase.isin(6, 7)

这是一个巨大的数据框架,这些错误的值可能会发生很多次。你能帮我一下吗?

预期输出:

+-----+-----+------+
|count|PHASE|CHANGE|
+-----+-----+------+
|    1|    3|     3|
|    2|    3|     3|
|    3|    6|     3|
|    4|    6|     3|
|    5|    8|     3|
|    6|    4|     4|
|    7|    4|     4|
|    8|    4|     4|
+-----+-----+------+
val rows = Seq(
Row(1, 3),
Row(2, 3),
Row(3, 6),
Row(4, 6),
Row(5, 8),
Row(6, 4),
Row(7, 4),
Row(8, 4)
)
val schema = StructType(
Seq(StructField("count", IntegerType), StructField("PHASE", IntegerType))
)
val df = spark.createDataFrame(
spark.sparkContext.parallelize(rows),
schema
)

提前感谢!

如果我正确理解了你的问题,你想要填充列CHANGE如下:

对于按count列排序的数据框,对于每一行,如果PHASE列的值匹配一组定义的规则,则在CHANGE列中设置该值。如果value与规则不匹配,则在CHANGE

中设置最新有效的PHASE
你可以使用一个用户定义的聚合函数在一个由COUNTcolumn排序的窗口上设置CHANGE

列首先,定义一个Aggregator对象,它的缓冲区将是最后一个有效阶段,并在其reduce函数中实现规则集:

import org.apache.spark.sql.expressions.Aggregator
import org.apache.spark.sql.{Encoder, Encoders}
object LatestValidPhase extends Aggregator[Integer, Integer, Integer] {
def zero: Integer = null
def reduce(lastPhase: Integer, phase: Integer): Integer = {
if (lastPhase == null && phase == 3) {
phase
} else if (Set(2, 3).contains(lastPhase) && phase == 4) {
phase
} else if (Set(4, 5).contains(lastPhase) && phase == 6) {
phase
} else if (Set(6, 7).contains(lastPhase) && phase == 8) {
phase
} else {
lastPhase
}
}
def merge(b1: Integer, b2: Integer): Integer = {
throw new NotImplementedError("should not use as general aggregation")
}
def finish(reduction: Integer): Integer = reduction
def bufferEncoder: Encoder[Integer] = Encoders.INT
def outputEncoder: Encoder[Integer] = Encoders.INT
}

然后将其转换为一个聚合用户定义函数,该函数应用于按COUNTcolumn:

排序的窗口
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions.{col, udaf}
val latest_valid_phase = udaf(LatestValidPhase)
val window = Window.orderBy("count")
df.withColumn("CHANGE", latest_valid_phase(col("PHASE")).over(window))

最新更新