如何在 PySpark 中的分组对象中插入列



如何在分组数据中插入 PySpark 数据帧?

例如:

我有一个包含以下列的 PySpark 数据帧:

+--------+-------------------+--------+
|webID   |timestamp          |counts  |
+--------+-------------------+--------+
|John    |2018-02-01 03:00:00|60      |
|John    |2018-02-01 03:03:00|66      |
|John    |2018-02-01 03:05:00|70      |
|John    |2018-02-01 03:08:00|76      |
|Mo      |2017-06-04 01:05:00|10      |
|Mo      |2017-06-04 01:07:00|20      |
|Mo      |2017-06-04 01:10:00|35      |
|Mo      |2017-06-04 01:11:00|40      |
+--------+----------------- -+--------+
我需要在 John 和 Mo

自己的间隔内每分钟将 John 和 Mo 的计数数据插值到一个数据点。我对任何简单的线性插值持开放态度 - 但请注意,我的真实数据是每隔几秒钟,我想插值到每秒。

所以结果应该是:

+--------+-------------------+--------+
|webID   |timestamp          |counts  |
+--------+-------------------+--------+
|John    |2018-02-01 03:00:00|60      |
|John    |2018-02-01 03:01:00|62      |
|John    |2018-02-01 03:02:00|64      |
|John    |2018-02-01 03:03:00|66      |
|John    |2018-02-01 03:04:00|68      |
|John    |2018-02-01 03:05:00|70      |
|John    |2018-02-01 03:06:00|72      |
|John    |2018-02-01 03:07:00|74      |
|John    |2018-02-01 03:08:00|76      |
|Mo      |2017-06-04 01:05:00|10      |
|Mo      |2017-06-04 01:06:00|15      |
|Mo      |2017-06-04 01:07:00|20      |
|Mo      |2017-06-04 01:08:00|25      |
|Mo      |2017-06-04 01:09:00|30      |
|Mo      |2017-06-04 01:10:00|35      |
|Mo      |2017-06-04 01:11:00|40      |
+--------+----------------- -+--------+

需要将新行添加到我的原始数据帧。寻找 PySpark 解决方案。

如果你使用Python,完成工作的最短方法是重用现有的Pandas函数,GROUPED_MAP udf:

from operator import attrgetter
from pyspark.sql.types import StructType
from pyspark.sql.functions import pandas_udf, PandasUDFType
def resample(schema, freq, timestamp_col = "timestamp",**kwargs):
    @pandas_udf(
        StructType(sorted(schema, key=attrgetter("name"))), 
        PandasUDFType.GROUPED_MAP)
    def _(pdf):
        pdf.set_index(timestamp_col, inplace=True)
        pdf = pdf.resample(freq).interpolate()
        pdf.ffill(inplace=True)
        pdf.reset_index(drop=False, inplace=True)
        pdf.sort_index(axis=1, inplace=True)
        return pdf
    return _

应用于您的数据:

from pyspark.sql.functions import to_timestamp
df = spark.createDataFrame([
    ("John",   "2018-02-01 03:00:00", 60),  
    ("John",   "2018-02-01 03:03:00", 66),  
    ("John",   "2018-02-01 03:05:00", 70),  
    ("John",   "2018-02-01 03:08:00", 76),  
    ("Mo",     "2017-06-04 01:05:00", 10),  
    ("Mo",     "2017-06-04 01:07:00", 20),  
    ("Mo",     "2017-06-04 01:10:00", 35),  
    ("Mo",     "2017-06-04 01:11:00", 40),
], ("webID", "timestamp", "counts")).withColumn(
  "timestamp", to_timestamp("timestamp")
)
df.groupBy("webID").apply(resample(df.schema, "60S")).show()

它产生

+------+-------------------+-----+
|counts|          timestamp|webID|
+------+-------------------+-----+
|    60|2018-02-01 03:00:00| John|
|    62|2018-02-01 03:01:00| John|
|    64|2018-02-01 03:02:00| John|
|    66|2018-02-01 03:03:00| John|
|    68|2018-02-01 03:04:00| John|
|    70|2018-02-01 03:05:00| John|
|    72|2018-02-01 03:06:00| John|
|    74|2018-02-01 03:07:00| John|
|    76|2018-02-01 03:08:00| John|
|    10|2017-06-04 01:05:00|   Mo|
|    15|2017-06-04 01:06:00|   Mo|
|    20|2017-06-04 01:07:00|   Mo|
|    25|2017-06-04 01:08:00|   Mo|
|    30|2017-06-04 01:09:00|   Mo|
|    35|2017-06-04 01:10:00|   Mo|
|    40|2017-06-04 01:11:00|   Mo|
+------+-------------------+-----+

这是在假设单个webID的输入和插值数据都可以放入单个节点的内存的情况下工作的(通常,其他精确的非迭代解决方案必须做出类似的假设(。如果不是这种情况,您可以通过采用重叠窗口轻松近似

partial = (df
    .groupBy("webID", window("timestamp", "5 minutes", "3 minutes")["start"])
    .apply(resample(df.schema, "60S")))

并汇总最终结果

from pyspark.sql.functions import mean
(partial
    .groupBy("webID", "timestamp")
    .agg(mean("counts")
    .alias("counts"))
    # Order by key and timestamp, only for consistent presentation
    .orderBy("webId", "timestamp")
    .show())

这当然要昂贵得多(有两个洗牌,有些值会被多次计算(,但如果重叠不够大,无法包含下一个观察结果,也会留下间隙。

+-----+-------------------+------+
|webID|          timestamp|counts|
+-----+-------------------+------+
| John|2018-02-01 03:00:00|  60.0|
| John|2018-02-01 03:01:00|  62.0|
| John|2018-02-01 03:02:00|  64.0|
| John|2018-02-01 03:03:00|  66.0|
| John|2018-02-01 03:04:00|  68.0|
| John|2018-02-01 03:05:00|  70.0|
| John|2018-02-01 03:08:00|  76.0|
|   Mo|2017-06-04 01:05:00|  10.0|
|   Mo|2017-06-04 01:06:00|  15.0|
|   Mo|2017-06-04 01:07:00|  20.0|
|   Mo|2017-06-04 01:08:00|  25.0|
|   Mo|2017-06-04 01:09:00|  30.0|
|   Mo|2017-06-04 01:10:00|  35.0|
|   Mo|2017-06-04 01:11:00|  40.0|
+-----+-------------------+------+

解决此问题的本机 pyspark 实现(没有 udf(是:

import pyspark.sql.functions as F
resample_interval = 1  # Resample interval size in seconds
df_interpolated = (
  df_data
  # Get timestamp and Counts of previous measurement via window function
  .selectExpr(
    "webID",
    "LAG(Timestamp) OVER (PARTITION BY webID ORDER BY Timestamp ASC) as PreviousTimestamp",
    "Timestamp as NextTimestamp",
    "LAG(Counts) OVER (PARTITION BY webID ORDER BY Timestamp ASC) as PreviousCounts",
    "Counts as NextCounts",
  )
  # To determine resample interval round up start and round down end timeinterval to nearest interval boundary
  .withColumn("PreviousTimestampRoundUp", F.expr(f"to_timestamp(ceil(unix_timestamp(PreviousTimestamp)/{resample_interval})*{resample_interval})"))
  .withColumn("NextTimestampRoundDown", F.expr(f"to_timestamp(floor(unix_timestamp(NextTimestamp)/{resample_interval})*{resample_interval})"))
  # Make sure we don't get any negative intervals (whole interval is within resample interval)
  .filter("PreviousTimestampRoundUp<=NextTimestampRoundDown")
  # Create resampled time axis by creating all "interval" timestamps between previous and next timestamp
  .withColumn("Timestamp", F.expr(f"explode(sequence(PreviousTimestampRoundUp, NextTimestampRoundDown, interval {resample_interval} second)) as Timestamp"))
  # Sequence has inclusive boundaries for both start and stop. Filter out duplicate Counts if original timestamp is exactly a boundary.
  .filter("Timestamp<NextTimestamp")
  # Interpolate Counts between previous and next
  .selectExpr(
    "webID",
    "Timestamp", 
    """(unix_timestamp(Timestamp)-unix_timestamp(PreviousTimestamp))
        /(unix_timestamp(NextTimestamp)-unix_timestamp(PreviousTimestamp))
        *(NextCounts-PreviousCounts) 
        +PreviousCounts
        as Counts"""
  )
)

我最近写了一篇博客文章来解释这种方法,并表明与上面的熊猫udf方法相比,这种方法对于大数据集的扩展性要好得多 https://medium.com/delaware-pro/interpolate-big-data-time-series-in-native-pyspark-d270d4b592a1:

这不是

一个Python的解决方案,但我想下面的Scala解决方案可以在Python中使用类似的方法实现。 它涉及使用 lag Window 函数在每行中创建一个时间范围,以及一个 UDF,该 UDF 通过 java.time API 将时间范围扩展为per-minute时间序列和插值计数的列表,然后使用 Spark 的 explode 方法将其展平:

import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window
import spark.implicits._
val df = Seq(
  ("John", "2018-02-01 03:00:00", 60),
  ("John", "2018-02-01 03:03:00", 66),
  ("John", "2018-02-01 03:05:00", 70),
  ("Mo", "2017-06-04 01:07:00", 20),
  ("Mo", "2017-06-04 01:10:00", 35),
  ("Mo", "2017-06-04 01:11:00", 40)
).toDF("webID", "timestamp", "count")
val winSpec = Window.partitionBy($"webID").orderBy($"timestamp")
def minuteList(timePattern: String) = udf{ (ts1: String, ts2: String, c1: Int, c2: Int) =>
  import java.time.LocalDateTime
  import java.time.format.DateTimeFormatter
  val timeFormat = DateTimeFormatter.ofPattern(timePattern)
  val perMinTS = if (ts1 == ts2) Vector(ts1) else {
      val t1 = LocalDateTime.parse(ts1, timeFormat)
      val t2 = LocalDateTime.parse(ts2, timeFormat)
      Iterator.iterate(t1.plusMinutes(1))(_.plusMinutes(1)).takeWhile(! _.isAfter(t2)).
        map(_.format(timeFormat)).
        toVector
    }
  val sz = perMinTS.size
  val perMinCount = for { i <- 1 to sz } yield c1 + ((c2 - c1) * i / sz)
  perMinTS zip perMinCount
}
df.
  withColumn("timestampPrev", when(row_number.over(winSpec) === 1, $"timestamp").
    otherwise(lag($"timestamp", 1).over(winSpec))).
  withColumn("countPrev", when(row_number.over(winSpec) === 1, $"count").
    otherwise(lag($"count", 1).over(winSpec))).
  withColumn("minuteList",
    minuteList("yyyy-MM-dd HH:mm:ss")($"timestampPrev", $"timestamp", $"countPrev", $"count")).
  withColumn("minute", explode($"minuteList")).
  select($"webID", $"minute._1".as("timestamp"), $"minute._2".as("count")).
  show
// +-----+-------------------+-----+
// |webID|          timestamp|count|
// +-----+-------------------+-----+
// | John|2018-02-01 03:00:00|   60|
// | John|2018-02-01 03:01:00|   62|
// | John|2018-02-01 03:02:00|   64|
// | John|2018-02-01 03:03:00|   66|
// | John|2018-02-01 03:04:00|   68|
// | John|2018-02-01 03:05:00|   70|
// |   Mo|2017-06-04 01:07:00|   20|
// |   Mo|2017-06-04 01:08:00|   25|
// |   Mo|2017-06-04 01:09:00|   30|
// |   Mo|2017-06-04 01:10:00|   35|
// |   Mo|2017-06-04 01:11:00|   40|
// +-----+-------------------+-----+

我扩展了@David的伟大答案,并使其动态地在更广泛的数据帧上使用它并使其可重用。

它将组和时间列名称作为输入变量:

import pyspark.sql.functions as f
resample_interval = 1  # Resample interval size in seconds
group_str = 'webID'    # name of group column
time_str = 'timestamp' # name of timestamp column
然后动态

检测数据帧中存在哪些其他列,然后动态创建 David 提出的选择表达式和插值表达式:

# extract columns to interpolate
interpol_col_str = df_data.drop(group_str, time_str).columns
# create select expression to get interpolation columns and previous measurement via window function
col_create_expr = []
for col in interpol_col_str:
  col_create_expr.extend([
  f"LAG({col}) OVER (PARTITION BY {group_str} ORDER BY {time_str} ASC) as Previous{col}",
  f"{col} as Next{col}"
  ])
# create interpolation expression for each interpolation column
interpol_expr = []
for col in interpol_col_str:
  interpol_expr.extend([
      f"""(unix_timestamp({time_str})-unix_timestamp(Previous{time_str}))
        /(unix_timestamp(Next{time_str})-unix_timestamp(Previous{time_str}))
        *(Next{col}-Previous{col}) 
        +Previous{col}
        as {col}"""]
  )

表达式可以通过解压缩列表来添加到插值方法中,* -> *col_create_expr*interpol_expr

df_interpolated = (
  df_data
  # Get timestamp and interpolation columns of previous measurement via window function
  .selectExpr(
    f"{group_str}",
    f"LAG({time_str}) OVER (PARTITION BY {group_str} ORDER BY {time_str} ASC) as Previous{time_str}",
    f"{time_str} as Next{time_str}",
    *col_create_expr
  )
  # To determine resample interval round up start and round down end timeinterval to nearest interval boundary
  .withColumn(f"Previous{time_str}RoundUp", f.expr(f"to_timestamp(ceil(unix_timestamp(Previous{time_str})/{resample_interval})*{resample_interval})"))
  .withColumn(f"Next{time_str}RoundDown", f.expr(f"to_timestamp(floor(unix_timestamp(Next{time_str})/{resample_interval})*{resample_interval})"))
  # Make sure we don't get any negative intervals (whole interval is within resample interval)
  .filter(f"Previous{time_str}RoundUp<=Next{time_str}RoundDown")
  # Create resampled time axis by creating all "interval" timestamps between previous and next timestamp
  .withColumn(f"{time_str}", f.expr(f"explode(sequence(Previous{time_str}RoundUp, Next{time_str}RoundDown, interval {resample_interval} second)) as {time_str}"))
  # Sequence has inclusive boundaries for both start and stop. Filter out duplicate {column_str} if original timestamp is exactly a boundary.
  .filter(f"{time_str}<Next{time_str}")
  # Interpolate {column_str} between previous and next
  .selectExpr(
    f"{group_str}",
    f"{time_str}", 
    *interpol_expr
  )
)

相关内容

  • 没有找到相关文章

最新更新