使用 .NET for Spark 在数据帧中填充数据帧中的时间高效间隙数据



我想用 .NET for Spark 填补我DataFrame中的空白。

当前DataFrame (rawData)包含reportFromreportTo之间的分钟间隔的数据

DateTime reportFrom = new DateTime(2021, 3, 4, 0, 0, 0);
DateTime reportTo = new DateTime(2021, 3, 5, 0, 0, 0);

缺少一些间隔,我想用最后一个已知值填充它们。

+----+-----+---+----+------+------------------+--------------------+------------------+
|Year|Month|Day|Hour|Minute|Id                |                Type|             Value|
+----+-----+---+----+------+------------------+--------------------+------------------+
|2021|    3|  4|   0|     0|                87|               Power|               0.0|
|2021|    3|  4|   0|     1|                87|               Power|               0.0|
|2021|    3|  4|   0|     2|                87|               Power|               0.0|
...
|2021|    3|  4|  14|     2|                87|               Power|             380.0|
|2021|    3|  4|  14|     3|                87|               Power|             380.0|
|2021|    3|  4|  14|     4|                87|               Power|             380.0|
|2021|    3|  4|  14|     5|                87|               Power|             380.0|
|2021|    3|  4|  14|     7|                87|               Power|             380.0|
...
|2021|    3|  4|  22|     7|                87|               Power|               0.0|

第一步(插入缺失的分钟数)后我期望的结果是:

+----+-----+---+----+------+------------------+--------------------+------------------+
|Year|Month|Day|Hour|Minute|Id                |                Type|             Value|
+----+-----+---+----+------+------------------+--------------------+------------------+
|2021|    3|  4|   0|     0|                87|               Power|               0.0|
|2021|    3|  4|   0|     1|                87|               Power|               0.0|
|2021|    3|  4|   0|     2|                87|               Power|               0.0|
...
|2021|    3|  4|  14|     2|                87|               Power|             380.0|
|2021|    3|  4|  14|     3|                87|               Power|             380.0|
|2021|    3|  4|  14|     4|                87|               Power|             380.0|
|2021|    3|  4|  14|     5|                87|               Power|             380.0|
|2021|    3|  4|  14|     6|              null|                null|              null|
|2021|    3|  4|  14|     7|                87|               Power|             380.0|
|2021|    3|  4|  14|     8|              null|                null|              null|
...
|2021|    3|  4|  23|    59|              null|                null|              null|               

到目前为止,我曾经创建一个包含所有分钟数的新DataFrame,然后在两个数据帧上执行left outer Join

int inc = 1;
List<DateTime> timeList = new List<DateTime>();
while (reportFrom < reportTo)
{
timeList.Add(reportFrom);
reportFrom = reportFrom.AddMinutes(inc);
}    
var toFillTime0 = new List<object> { -1, 0, 0, 0, 0 };
var dataToFill = spark.CreateDataFrame(
new List<GenericRow> { new GenericRow(toFillTime0.ToArray()) },
new StructType(                     //shema
new List<StructField>()
{
new StructField("Year0", new IntegerType()),
new StructField("Month0", new IntegerType()),
new StructField("Day0", new IntegerType()),
new StructField("Hour0", new IntegerType()),
new StructField("Minute0", new IntegerType()),
}));
foreach (DateTime time in timeList)
{
var toFillTime = new List<object> { time.Year, time.Month, time.Day, time.Hour, time.Minute };
var dataToFillt = spark.CreateDataFrame(
new List<GenericRow> { new GenericRow(toFillTime.ToArray()) },
new StructType(                     //shema
new List<StructField>()
{
new StructField("Year0", new IntegerType()),
new StructField("Month0", new IntegerType()),
new StructField("Day0", new IntegerType()),
new StructField("Hour0", new IntegerType()),
new StructField("Minute0", new IntegerType()),
}));
dataToFill = dataToFill.Union(dataToFillt);
}
dataToFill = dataToFill.Filter("Year0 > 0");    
var toFillReportDataReq = dataToFill.Join(rawData,
dataToFill["Year0"] == update10["Year"] & dataToFill["Month0"] == update10["Month"] & dataToFill["Day0"] == update10["Day"]
& dataToFill["Hour0"] == update10["Hour"] & dataToFill["Minute0"] == update10["Minute"], "left_outer");    

下面显示了几行toFillReportDataReq

|2021|    3|  4|  22|     4|                87|               Power|               0.0|
|2021|    3|  4|  22|     5|                87|               Power|               0.0|
|2021|    3|  4|  22|     6|                87|               Power|               0.0|
|2021|    3|  4|  22|     7|                87|               Power|               0.0|
|2021|    3|  4|  22|     8|              null|                null|              null|
|2021|    3|  4|  22|     9|              null|                null|              null|
|2021|    3|  4|  22|    10|              null|                null|              null|
|2021|    3|  4|  22|    11|              null|                null|              null|
|2021|    3|  4|  22|    12|              null|                null|              null|
|2021|    3|  4|  22|    13|              null|                null|              null|
|2021|    3|  4|  22|    14|              null|                null|              null|

Values中空值的替换已使用windowlast函数进行了介绍。

IdType中的值替换为var id = 87和"幂",使用

toFillReportDataReq = toFillReportDataReq.WithColumn("Id", Functions.When(toFillReportDataReq["Id"].IsNull(), id)
.Otherwise(toFillReportDataReq["Id"]))
.WithColumn("Type", Functions.When(toFillReportDataReq["Type"].IsNull(), "Power")
.Otherwise(toFillReportDataReq["Type"]));

此方法返回我想要的结果,但它非常耗时(效率低下)。

我的问题如下:

  • 是否有更充分的方法来创建一个包含指定间隔之间所有分钟数的新DataFrame
  • 有没有办法避免在这种方法中加入?
  • 将 Id 到 id 列中的所有值定义为 id 和 Type 定义为"Power"的最佳方法是什么?

谢谢!

这是我会采取的方法:

  1. 构建一个数据帧,该帧每表示一分钟都有一行(我使用了 spark。范围为我需要的每一分钟投影一行)
  2. 对于"范围"中的每个 ID,在开始日期上添加一分钟
  3. 使用left_outer连接将日期连接到原始数据框,以免丢失任何行
  4. 然后使用 Last 填充任何空白 - 请注意,如果以 null 开头,则 newValue 将为 null,直到您获得一些数据
using System;
using System.Collections.Generic;
using Microsoft.Spark.Sql;
using Microsoft.Spark.Sql.Expressions;
using Microsoft.Spark.Sql.Types;
namespace StackOverflow
{
class Program
{
static void Main(string[] args)
{
var spark = SparkSession.Builder().GetOrCreate();

//Sample data set - we will fill in the missing minutes
var df = spark.CreateDataFrame(new List<GenericRow>()
{
new GenericRow(new object[] {2021, 3, 4, 8, 3, 87, "Type1", 380.5}),
new GenericRow(new object[] {2021, 3, 4, 8, 10, null, null, null}),
new GenericRow(new object[] {2021, 3, 4, 8, 20, null, null, null}),
new GenericRow(new object[] {2021, 3, 4, 8, 25, null, null, null}),
new GenericRow(new object[] {2021, 3, 4, 8, 35, 87, "Type1", 0.0}),
new GenericRow(new object[] {2021, 3, 4, 8, 45, 87, "Type1", 0.0})
}, new StructType(new List<StructField>()
{
new StructField("Year", new IntegerType()),
new StructField("Month", new IntegerType()),
new StructField("Day", new IntegerType()),
new StructField("Hour", new IntegerType()),
new StructField("Minute", new IntegerType()),
new StructField("ID", new IntegerType()),
new StructField("Type", new StringType()),
new StructField("Value", new DoubleType()),
}));

//start and end time
var reportFrom = new DateTime(2021, 3, 4, 7, 0, 0);
var reportTo = new DateTime(2021, 3, 4, 9, 0, 0);

//convert start time to unix epoch as we can't pass a DateTime to spark (yet!)
var unixFromTime = (reportFrom - new DateTime(1970, 1, 1, 0, 0, 0, 0)).TotalSeconds;

//how many total rows do we need?
var minutesToCreate = reportTo.Subtract(reportFrom).TotalMinutes;

//create a dataframe with 1 row for every minute we need
var everyMinute = spark.Range((long) minutesToCreate);

//Add the reportFrom unix epoch
everyMinute = everyMinute.WithColumn("BaseTime", Functions.Lit(unixFromTime));

//add to the unix epoch, the Id (incrementing number) multiplied by 60 - if we didn't mul(60) it would add seconds and not minutes
everyMinute = everyMinute.WithColumn("Time",
Functions.Lit(unixFromTime)
.Plus(Functions.Col("Id").Cast("Int").Multiply(Functions.Lit(60))));

//convert the unix epoch to an actual timestamp and drop all the intermediate columns
everyMinute = everyMinute.WithColumn("Date",
Functions.ToTimestamp(Functions.FromUnixTime(Functions.Col("Time")))).Select("Date");

//convert timestamp into individual columns
everyMinute = everyMinute.WithColumn("Year", Functions.Year(Functions.Col("Date")));
everyMinute = everyMinute.WithColumn("Month", Functions.Month(Functions.Col("Date")));
everyMinute = everyMinute.WithColumn("Day", Functions.DayOfMonth(Functions.Col("Date")));
everyMinute = everyMinute.WithColumn("Hour", Functions.Hour(Functions.Col("Date")));
everyMinute = everyMinute.WithColumn("Minute", Functions.Minute(Functions.Col("Date")));
//join both data frames so...
var dfAllData = everyMinute.Join(df, new List<string>() {"Year", "Month", "Day", "Hour", "Minute"}, "left_outer");

//add in data using Last
var window = Window.OrderBy("Year", "Month", "Day", "Hour", "Minute");
var filledDataFrame = dfAllData.WithColumn("newValue",
Functions.When(dfAllData["Value"].IsNull(),
Functions.Last(dfAllData["Value"], true).Over(window))
.Otherwise(dfAllData["Value"]));
filledDataFrame.Show(1000, 10000);
}
}
}

艾德

最新更新