为Spark正向填充.NET



我正在查看.NET(C#(中SparkDataFrame的窗口函数。

我有一个DataFramedf,列为年、月、日、小时、分钟、ID、类型和值:

| 2021 |  3  |  4  |  8  |  9  |  87  |  Type1  |  380.5  |
| 2021 |  3  |  4  |  8  |  10 | null |   null  |   null  |
| 2021 |  3  |  4  |  8  |  11 | null |   null  |   null  |
| 2021 |  3  |  4  |  8  |  12 | null |   null  |   null  |
| 2021 |  3  |  4  |  8  |  13 |  87  |  Type1  |    0.0  |
| 2021 |  3  |  4  |  8  |  14 |  87  |  Type1  |    0.0  |

我想用前一行基于年、月、日、小时、分钟的值填充空行(null(,如下所示:

| 2021 |  3  |  4  |  8  |  9  |  87  |  Type1  |  380.5  |
| 2021 |  3  |  4  |  8  |  10 |  87  |  Type1  |  380.5  |
| 2021 |  3  |  4  |  8  |  11 |  87  |  Type1  |  380.5  |
| 2021 |  3  |  4  |  8  |  12 |  87  |  Type1  |  380.5  |
| 2021 |  3  |  4  |  8  |  13 |  87  |  Type1  |    0.0  |
| 2021 |  3  |  4  |  8  |  14 |  87  |  Type1  |    0.0  |

到目前为止,我在scala中找到了使用Windows和Lag函数的解决方案,但我不确定如何在C#中做到这一点。在scala中,窗口将被定义为类似于:

val window = Window.orderBy("Year", "Month", "Day", "Hour", "Minute")

我想使用添加一个newValue列

var filledDataFrame = df.WithColumn("newValue", Functions.When(df["Value"].IsNull(), Functions.Lag(df["Value"], 1).Over(window)).Otherwise(df["Value"])

如何在.NET中为Spark定义一个窗口,并使用Lag函数正向填充null值?

要将Lag和Window与.NET一起用于Apache Spark,您非常接近,需要:

var spark = SparkSession.Builder().GetOrCreate();
var df = spark.CreateDataFrame(new List<GenericRow>()
{
new GenericRow(new object[] {2021, 3, 4, 8, 9, 87, "Type1", 380.5}),
new GenericRow(new object[] {2021, 3, 4, 8, 10, null, null, null}),
new GenericRow(new object[] {2021, 3, 4, 8, 11, null, null, null}),
new GenericRow(new object[] {2021, 3, 4, 8, 12, null, null, null}),
new GenericRow(new object[] {2021, 3, 4, 8, 13, 87, "Type1", 0.0}),
new GenericRow(new object[] {2021, 3, 4, 8, 14, 87, "Type1", 0.0})
}, new StructType(new List<StructField>()
{
new StructField("Year", new IntegerType()),
new StructField("Month", new IntegerType()),
new StructField("Day", new IntegerType()),
new StructField("Hour", new IntegerType()),
new StructField("Minute", new IntegerType()),
new StructField("ID", new IntegerType()),
new StructField("Type", new StringType()),
new StructField("Value", new DoubleType()),
}));
var window = Window.OrderBy("Year", "Month", "Day", "Hour", "Minute");
var filledDataFrame = df.WithColumn("newValue",
Functions.When(df["Value"].IsNull(),
Functions.Lag(df["Value"], 1).Over(window))
.Otherwise(df["Value"]));
filledDataFrame.Show(1000, 10000);

这将导致:

+----+-----+---+----+------+----+-----+-----+--------+
|Year|Month|Day|Hour|Minute|  ID| Type|Value|newValue|
+----+-----+---+----+------+----+-----+-----+--------+
|2021|    3|  4|   8|     9|  87|Type1|380.5|   380.5|
|2021|    3|  4|   8|    10|null| null| null|   380.5|
|2021|    3|  4|   8|    11|null| null| null|    null|
|2021|    3|  4|   8|    12|null| null| null|    null|
|2021|    3|  4|   8|    13|  87|Type1|  0.0|     0.0|
|2021|    3|  4|   8|    14|  87|Type1|  0.0|     0.0|
+----+-----+---+----+------+----+-----+-----+--------+

但是您可能想要Last而不是Lag,因为您可以跳过null:

var spark = SparkSession.Builder().GetOrCreate();
var df = spark.CreateDataFrame(new List<GenericRow>()
{
new GenericRow(new object[] {2021, 3, 4, 8, 9, 87, "Type1", 380.5}),
new GenericRow(new object[] {2021, 3, 4, 8, 10, null, null, null}),
new GenericRow(new object[] {2021, 3, 4, 8, 11, null, null, null}),
new GenericRow(new object[] {2021, 3, 4, 8, 12, null, null, null}),
new GenericRow(new object[] {2021, 3, 4, 8, 13, 87, "Type1", 0.0}),
new GenericRow(new object[] {2021, 3, 4, 8, 14, 87, "Type1", 0.0})
}, new StructType(new List<StructField>()
{
new StructField("Year", new IntegerType()),
new StructField("Month", new IntegerType()),
new StructField("Day", new IntegerType()),
new StructField("Hour", new IntegerType()),
new StructField("Minute", new IntegerType()),
new StructField("ID", new IntegerType()),
new StructField("Type", new StringType()),
new StructField("Value", new DoubleType()),
}));
var window = Window.OrderBy("Year", "Month", "Day", "Hour", "Minute");
var filledDataFrame = df.WithColumn("newValue",
Functions.When(df["Value"].IsNull(),
Functions.Last(df["Value"], true).Over(window))
.Otherwise(df["Value"]));
filledDataFrame.Show(1000, 10000);

结果是:

+----+-----+---+----+------+----+-----+-----+--------+
|Year|Month|Day|Hour|Minute|  ID| Type|Value|newValue|
+----+-----+---+----+------+----+-----+-----+--------+
|2021|    3|  4|   8|     9|  87|Type1|380.5|   380.5|
|2021|    3|  4|   8|    10|null| null| null|   380.5|
|2021|    3|  4|   8|    11|null| null| null|   380.5|
|2021|    3|  4|   8|    12|null| null| null|   380.5|
|2021|    3|  4|   8|    13|  87|Type1|  0.0|     0.0|
|2021|    3|  4|   8|    14|  87|Type1|  0.0|     0.0|
+----+-----+---+----+------+----+-----+-----+--------+

希望它能有所帮助!

ed

(使之工作所需的使用语句(

using System;
using System.Collections.Generic;
using Microsoft.Spark.Sql;
using Microsoft.Spark.Sql.Expressions;
using Microsoft.Spark.Sql.Types;

最新更新