我需要从数据框中删除一些不在正确位置的值。
我有以下数据框架,例如:
+-----+-----+
|count|PHASE|
+-----+-----+
| 1| 3|
| 2| 3|
| 3| 6|
| 4| 6|
| 5| 8|
| 6| 4|
| 7| 4|
| 8| 4|
+-----+-----+
我需要从数据框架中删除6和8,因为一些规则:
phase === 3 and lastPhase.isNull
phase === 4 and lastPhase.isin(2, 3)
phase === 6 and lastPhase.isin(4, 5)
phase === 8 and lastPhase.isin(6, 7)
这是一个巨大的数据框架,这些错误的值可能会发生很多次。你能帮我一下吗?
预期输出:
+-----+-----+------+
|count|PHASE|CHANGE|
+-----+-----+------+
| 1| 3| 3|
| 2| 3| 3|
| 3| 6| 3|
| 4| 6| 3|
| 5| 8| 3|
| 6| 4| 4|
| 7| 4| 4|
| 8| 4| 4|
+-----+-----+------+
val rows = Seq(
Row(1, 3),
Row(2, 3),
Row(3, 6),
Row(4, 6),
Row(5, 8),
Row(6, 4),
Row(7, 4),
Row(8, 4)
)
val schema = StructType(
Seq(StructField("count", IntegerType), StructField("PHASE", IntegerType))
)
val df = spark.createDataFrame(
spark.sparkContext.parallelize(rows),
schema
)
提前感谢!
如果我正确理解了你的问题,你想要填充列CHANGE如下:
你可以使用一个用户定义的聚合函数在一个由COUNTcolumn排序的窗口上设置CHANGE对于按count列排序的数据框,对于每一行,如果PHASE列的值匹配一组定义的规则,则在CHANGE列中设置该值。如果value与规则不匹配,则在CHANGE列
中设置最新有效的PHASE值
列首先,定义一个Aggregator
对象,它的缓冲区将是最后一个有效阶段,并在其reduce
函数中实现规则集:
import org.apache.spark.sql.expressions.Aggregator
import org.apache.spark.sql.{Encoder, Encoders}
object LatestValidPhase extends Aggregator[Integer, Integer, Integer] {
def zero: Integer = null
def reduce(lastPhase: Integer, phase: Integer): Integer = {
if (lastPhase == null && phase == 3) {
phase
} else if (Set(2, 3).contains(lastPhase) && phase == 4) {
phase
} else if (Set(4, 5).contains(lastPhase) && phase == 6) {
phase
} else if (Set(6, 7).contains(lastPhase) && phase == 8) {
phase
} else {
lastPhase
}
}
def merge(b1: Integer, b2: Integer): Integer = {
throw new NotImplementedError("should not use as general aggregation")
}
def finish(reduction: Integer): Integer = reduction
def bufferEncoder: Encoder[Integer] = Encoders.INT
def outputEncoder: Encoder[Integer] = Encoders.INT
}
然后将其转换为一个聚合用户定义函数,该函数应用于按COUNTcolumn:
排序的窗口import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions.{col, udaf}
val latest_valid_phase = udaf(LatestValidPhase)
val window = Window.orderBy("count")
df.withColumn("CHANGE", latest_valid_phase(col("PHASE")).over(window))