如何在分组数据中插入 PySpark 数据帧?
例如:
我有一个包含以下列的 PySpark 数据帧:
+--------+-------------------+--------+
|webID |timestamp |counts |
+--------+-------------------+--------+
|John |2018-02-01 03:00:00|60 |
|John |2018-02-01 03:03:00|66 |
|John |2018-02-01 03:05:00|70 |
|John |2018-02-01 03:08:00|76 |
|Mo |2017-06-04 01:05:00|10 |
|Mo |2017-06-04 01:07:00|20 |
|Mo |2017-06-04 01:10:00|35 |
|Mo |2017-06-04 01:11:00|40 |
+--------+----------------- -+--------+
我需要在 John 和 Mo 自己的间隔内每分钟将 John 和 Mo 的计数数据插值到一个数据点。我对任何简单的线性插值持开放态度 - 但请注意,我的真实数据是每隔几秒钟,我想插值到每秒。
所以结果应该是:
+--------+-------------------+--------+
|webID |timestamp |counts |
+--------+-------------------+--------+
|John |2018-02-01 03:00:00|60 |
|John |2018-02-01 03:01:00|62 |
|John |2018-02-01 03:02:00|64 |
|John |2018-02-01 03:03:00|66 |
|John |2018-02-01 03:04:00|68 |
|John |2018-02-01 03:05:00|70 |
|John |2018-02-01 03:06:00|72 |
|John |2018-02-01 03:07:00|74 |
|John |2018-02-01 03:08:00|76 |
|Mo |2017-06-04 01:05:00|10 |
|Mo |2017-06-04 01:06:00|15 |
|Mo |2017-06-04 01:07:00|20 |
|Mo |2017-06-04 01:08:00|25 |
|Mo |2017-06-04 01:09:00|30 |
|Mo |2017-06-04 01:10:00|35 |
|Mo |2017-06-04 01:11:00|40 |
+--------+----------------- -+--------+
需要将新行添加到我的原始数据帧。寻找 PySpark 解决方案。
如果你使用Python,完成工作的最短方法是重用现有的Pandas函数,GROUPED_MAP
udf:
from operator import attrgetter
from pyspark.sql.types import StructType
from pyspark.sql.functions import pandas_udf, PandasUDFType
def resample(schema, freq, timestamp_col = "timestamp",**kwargs):
@pandas_udf(
StructType(sorted(schema, key=attrgetter("name"))),
PandasUDFType.GROUPED_MAP)
def _(pdf):
pdf.set_index(timestamp_col, inplace=True)
pdf = pdf.resample(freq).interpolate()
pdf.ffill(inplace=True)
pdf.reset_index(drop=False, inplace=True)
pdf.sort_index(axis=1, inplace=True)
return pdf
return _
应用于您的数据:
from pyspark.sql.functions import to_timestamp
df = spark.createDataFrame([
("John", "2018-02-01 03:00:00", 60),
("John", "2018-02-01 03:03:00", 66),
("John", "2018-02-01 03:05:00", 70),
("John", "2018-02-01 03:08:00", 76),
("Mo", "2017-06-04 01:05:00", 10),
("Mo", "2017-06-04 01:07:00", 20),
("Mo", "2017-06-04 01:10:00", 35),
("Mo", "2017-06-04 01:11:00", 40),
], ("webID", "timestamp", "counts")).withColumn(
"timestamp", to_timestamp("timestamp")
)
df.groupBy("webID").apply(resample(df.schema, "60S")).show()
它产生
+------+-------------------+-----+
|counts| timestamp|webID|
+------+-------------------+-----+
| 60|2018-02-01 03:00:00| John|
| 62|2018-02-01 03:01:00| John|
| 64|2018-02-01 03:02:00| John|
| 66|2018-02-01 03:03:00| John|
| 68|2018-02-01 03:04:00| John|
| 70|2018-02-01 03:05:00| John|
| 72|2018-02-01 03:06:00| John|
| 74|2018-02-01 03:07:00| John|
| 76|2018-02-01 03:08:00| John|
| 10|2017-06-04 01:05:00| Mo|
| 15|2017-06-04 01:06:00| Mo|
| 20|2017-06-04 01:07:00| Mo|
| 25|2017-06-04 01:08:00| Mo|
| 30|2017-06-04 01:09:00| Mo|
| 35|2017-06-04 01:10:00| Mo|
| 40|2017-06-04 01:11:00| Mo|
+------+-------------------+-----+
这是在假设单个webID
的输入和插值数据都可以放入单个节点的内存的情况下工作的(通常,其他精确的非迭代解决方案必须做出类似的假设(。如果不是这种情况,您可以通过采用重叠窗口轻松近似
partial = (df
.groupBy("webID", window("timestamp", "5 minutes", "3 minutes")["start"])
.apply(resample(df.schema, "60S")))
并汇总最终结果
from pyspark.sql.functions import mean
(partial
.groupBy("webID", "timestamp")
.agg(mean("counts")
.alias("counts"))
# Order by key and timestamp, only for consistent presentation
.orderBy("webId", "timestamp")
.show())
这当然要昂贵得多(有两个洗牌,有些值会被多次计算(,但如果重叠不够大,无法包含下一个观察结果,也会留下间隙。
+-----+-------------------+------+
|webID| timestamp|counts|
+-----+-------------------+------+
| John|2018-02-01 03:00:00| 60.0|
| John|2018-02-01 03:01:00| 62.0|
| John|2018-02-01 03:02:00| 64.0|
| John|2018-02-01 03:03:00| 66.0|
| John|2018-02-01 03:04:00| 68.0|
| John|2018-02-01 03:05:00| 70.0|
| John|2018-02-01 03:08:00| 76.0|
| Mo|2017-06-04 01:05:00| 10.0|
| Mo|2017-06-04 01:06:00| 15.0|
| Mo|2017-06-04 01:07:00| 20.0|
| Mo|2017-06-04 01:08:00| 25.0|
| Mo|2017-06-04 01:09:00| 30.0|
| Mo|2017-06-04 01:10:00| 35.0|
| Mo|2017-06-04 01:11:00| 40.0|
+-----+-------------------+------+
解决此问题的本机 pyspark 实现(没有 udf(是:
import pyspark.sql.functions as F
resample_interval = 1 # Resample interval size in seconds
df_interpolated = (
df_data
# Get timestamp and Counts of previous measurement via window function
.selectExpr(
"webID",
"LAG(Timestamp) OVER (PARTITION BY webID ORDER BY Timestamp ASC) as PreviousTimestamp",
"Timestamp as NextTimestamp",
"LAG(Counts) OVER (PARTITION BY webID ORDER BY Timestamp ASC) as PreviousCounts",
"Counts as NextCounts",
)
# To determine resample interval round up start and round down end timeinterval to nearest interval boundary
.withColumn("PreviousTimestampRoundUp", F.expr(f"to_timestamp(ceil(unix_timestamp(PreviousTimestamp)/{resample_interval})*{resample_interval})"))
.withColumn("NextTimestampRoundDown", F.expr(f"to_timestamp(floor(unix_timestamp(NextTimestamp)/{resample_interval})*{resample_interval})"))
# Make sure we don't get any negative intervals (whole interval is within resample interval)
.filter("PreviousTimestampRoundUp<=NextTimestampRoundDown")
# Create resampled time axis by creating all "interval" timestamps between previous and next timestamp
.withColumn("Timestamp", F.expr(f"explode(sequence(PreviousTimestampRoundUp, NextTimestampRoundDown, interval {resample_interval} second)) as Timestamp"))
# Sequence has inclusive boundaries for both start and stop. Filter out duplicate Counts if original timestamp is exactly a boundary.
.filter("Timestamp<NextTimestamp")
# Interpolate Counts between previous and next
.selectExpr(
"webID",
"Timestamp",
"""(unix_timestamp(Timestamp)-unix_timestamp(PreviousTimestamp))
/(unix_timestamp(NextTimestamp)-unix_timestamp(PreviousTimestamp))
*(NextCounts-PreviousCounts)
+PreviousCounts
as Counts"""
)
)
我最近写了一篇博客文章来解释这种方法,并表明与上面的熊猫udf方法相比,这种方法对于大数据集的扩展性要好得多 https://medium.com/delaware-pro/interpolate-big-data-time-series-in-native-pyspark-d270d4b592a1:
一个Python
的解决方案,但我想下面的Scala
解决方案可以在Python
中使用类似的方法实现。 它涉及使用 lag
Window 函数在每行中创建一个时间范围,以及一个 UDF,该 UDF 通过 java.time
API 将时间范围扩展为per-minute
时间序列和插值计数的列表,然后使用 Spark 的 explode
方法将其展平:
import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window
import spark.implicits._
val df = Seq(
("John", "2018-02-01 03:00:00", 60),
("John", "2018-02-01 03:03:00", 66),
("John", "2018-02-01 03:05:00", 70),
("Mo", "2017-06-04 01:07:00", 20),
("Mo", "2017-06-04 01:10:00", 35),
("Mo", "2017-06-04 01:11:00", 40)
).toDF("webID", "timestamp", "count")
val winSpec = Window.partitionBy($"webID").orderBy($"timestamp")
def minuteList(timePattern: String) = udf{ (ts1: String, ts2: String, c1: Int, c2: Int) =>
import java.time.LocalDateTime
import java.time.format.DateTimeFormatter
val timeFormat = DateTimeFormatter.ofPattern(timePattern)
val perMinTS = if (ts1 == ts2) Vector(ts1) else {
val t1 = LocalDateTime.parse(ts1, timeFormat)
val t2 = LocalDateTime.parse(ts2, timeFormat)
Iterator.iterate(t1.plusMinutes(1))(_.plusMinutes(1)).takeWhile(! _.isAfter(t2)).
map(_.format(timeFormat)).
toVector
}
val sz = perMinTS.size
val perMinCount = for { i <- 1 to sz } yield c1 + ((c2 - c1) * i / sz)
perMinTS zip perMinCount
}
df.
withColumn("timestampPrev", when(row_number.over(winSpec) === 1, $"timestamp").
otherwise(lag($"timestamp", 1).over(winSpec))).
withColumn("countPrev", when(row_number.over(winSpec) === 1, $"count").
otherwise(lag($"count", 1).over(winSpec))).
withColumn("minuteList",
minuteList("yyyy-MM-dd HH:mm:ss")($"timestampPrev", $"timestamp", $"countPrev", $"count")).
withColumn("minute", explode($"minuteList")).
select($"webID", $"minute._1".as("timestamp"), $"minute._2".as("count")).
show
// +-----+-------------------+-----+
// |webID| timestamp|count|
// +-----+-------------------+-----+
// | John|2018-02-01 03:00:00| 60|
// | John|2018-02-01 03:01:00| 62|
// | John|2018-02-01 03:02:00| 64|
// | John|2018-02-01 03:03:00| 66|
// | John|2018-02-01 03:04:00| 68|
// | John|2018-02-01 03:05:00| 70|
// | Mo|2017-06-04 01:07:00| 20|
// | Mo|2017-06-04 01:08:00| 25|
// | Mo|2017-06-04 01:09:00| 30|
// | Mo|2017-06-04 01:10:00| 35|
// | Mo|2017-06-04 01:11:00| 40|
// +-----+-------------------+-----+
我扩展了@David的伟大答案,并使其动态地在更广泛的数据帧上使用它并使其可重用。
它将组和时间列名称作为输入变量:
import pyspark.sql.functions as f
resample_interval = 1 # Resample interval size in seconds
group_str = 'webID' # name of group column
time_str = 'timestamp' # name of timestamp column
然后动态检测数据帧中存在哪些其他列,然后动态创建 David 提出的选择表达式和插值表达式:
# extract columns to interpolate
interpol_col_str = df_data.drop(group_str, time_str).columns
# create select expression to get interpolation columns and previous measurement via window function
col_create_expr = []
for col in interpol_col_str:
col_create_expr.extend([
f"LAG({col}) OVER (PARTITION BY {group_str} ORDER BY {time_str} ASC) as Previous{col}",
f"{col} as Next{col}"
])
# create interpolation expression for each interpolation column
interpol_expr = []
for col in interpol_col_str:
interpol_expr.extend([
f"""(unix_timestamp({time_str})-unix_timestamp(Previous{time_str}))
/(unix_timestamp(Next{time_str})-unix_timestamp(Previous{time_str}))
*(Next{col}-Previous{col})
+Previous{col}
as {col}"""]
)
表达式可以通过解压缩列表来添加到插值方法中,*
-> *col_create_expr
和 *interpol_expr
:
df_interpolated = (
df_data
# Get timestamp and interpolation columns of previous measurement via window function
.selectExpr(
f"{group_str}",
f"LAG({time_str}) OVER (PARTITION BY {group_str} ORDER BY {time_str} ASC) as Previous{time_str}",
f"{time_str} as Next{time_str}",
*col_create_expr
)
# To determine resample interval round up start and round down end timeinterval to nearest interval boundary
.withColumn(f"Previous{time_str}RoundUp", f.expr(f"to_timestamp(ceil(unix_timestamp(Previous{time_str})/{resample_interval})*{resample_interval})"))
.withColumn(f"Next{time_str}RoundDown", f.expr(f"to_timestamp(floor(unix_timestamp(Next{time_str})/{resample_interval})*{resample_interval})"))
# Make sure we don't get any negative intervals (whole interval is within resample interval)
.filter(f"Previous{time_str}RoundUp<=Next{time_str}RoundDown")
# Create resampled time axis by creating all "interval" timestamps between previous and next timestamp
.withColumn(f"{time_str}", f.expr(f"explode(sequence(Previous{time_str}RoundUp, Next{time_str}RoundDown, interval {resample_interval} second)) as {time_str}"))
# Sequence has inclusive boundaries for both start and stop. Filter out duplicate {column_str} if original timestamp is exactly a boundary.
.filter(f"{time_str}<Next{time_str}")
# Interpolate {column_str} between previous and next
.selectExpr(
f"{group_str}",
f"{time_str}",
*interpol_expr
)
)