将列添加到一个基于另一个字段的数据框中,但需要增加



我有一个我一直在努力的问题,我必须想象有一个更优雅的解决方案,而不是通过行循环循环循环。我有一个数据框,如下:

          EventTime | ConditionMet
--------- --------- | --------
2017-09-11 00:00:01 | 0
2017-09-11 00:00:02 | 0
2017-09-11 00:00:03 | 0
2017-09-11 00:00:04 | 1
2017-09-11 00:00:05 | 1
2017-09-11 00:00:06 | 1
2017-09-11 00:00:07 | 0
2017-09-11 00:00:08 | 0
2017-09-11 00:00:09 | 1
2017-09-11 00:00:10 | 1
2017-09-11 00:00:11 | 1
2017-09-11 00:00:12 | 1
2017-09-11 00:00:13 | 0

每次满足条件时(ConditionMet=1),我想用事件名称(例如Event1)标记记录。我找不到使用.withColumn()条件或窗口使用CC_2的优雅方法。理想的结果是:

        EventTime   |ConditionMet|EventName
----------------- - | ---------- | --------
2017-09-11 00:00:01 |       0    |     
2017-09-11 00:00:02 |       0    |
2017-09-11 00:00:03 |       0    |
2017-09-11 00:00:04 |       1    | Event1
2017-09-11 00:00:05 |       1    | Event1
2017-09-11 00:00:06 |       1    | Event1
2017-09-11 00:00:07 |       0    |
2017-09-11 00:00:08 |       0    |
2017-09-11 00:00:09 |       1    | Event2
2017-09-11 00:00:10 |       1    | Event2
2017-09-11 00:00:11 |       1    | Event2
2017-09-11 00:00:12 |       1    | Event2
2017-09-11 00:00:13 |       0    | 

对这里的任何聪明方法感兴趣。

我认为,如果想法是用唯一标签标记每个顺序群集,那么您可以通过计算累积总和

来做到这一点
  1. 排序DF,例如
  2. EventTime
  3. 反转ConditionMet
  4. 计算倒置列的累积总和
  5. 忽略ConditionMet = 0的累积总和,并将累积总和作为 ConditionMet = 1

    的群集的标签
    +-------------------+------------+-------+-------------+---------+
    |          EventTime|ConditionMet|InvCond|CumulativeSum|EventName|
    +-------------------+------------+-------+-------------+---------+
    |2017-09-11 00:00:01|           0|      1|            1|         |
    |2017-09-11 00:00:02|           0|      1|            2|         |
    |2017-09-11 00:00:03|           0|      1|            3|         |
    |2017-09-11 00:00:04|           1|      0|            3|   Event3|
    |2017-09-11 00:00:05|           1|      0|            3|   Event3|
    |2017-09-11 00:00:06|           1|      0|            3|   Event3|
    |2017-09-11 00:00:07|           0|      1|            4|         |
    |2017-09-11 00:00:08|           0|      1|            5|         |
    |2017-09-11 00:00:09|           1|      0|            5|   Event5|
    |2017-09-11 00:00:10|           1|      0|            5|   Event5|
    |2017-09-11 00:00:11|           1|      0|            5|   Event5|
    |2017-09-11 00:00:12|           1|      0|            5|   Event5|
    |2017-09-11 00:00:13|           0|      1|            6|         |
    +-------------------+------------+-------+-------------+---------+
    

代码

from pyspark.sql.functions import lag, udf, col
from pyspark.sql import Row
from pyspark.sql.types import StringType, IntegerType
from pyspark.sql.window import Window
def tagSequentialClusters(df, condColumn, tagColumnName):
    ## Invert condition 0 will be 1 and 1 will be 0
    def InvCond(value):
        if value == '1':
            return 0
        else:
            return 1
    ## Add Event for the valid clusters
    def mapEventNumber(cond, number):
        if cond == "1":
            return "Event" + str(number)
        else:
            return ""
    ## Add new columns to row
    def addRowColumn(row, **kwargs):
        rowData = row.asDict()
        for column in kwargs:
            rowData[column] = kwargs[column]
        return Row(**rowData)
    ## Calculate partial cumulative sum for partition iterator
    def calcPartialCumulativeSum(iter):
        counter = 0     
        final_iterator = []
        for row in iter:
            counter = counter + row["InvCond"]
            newRow = addRowColumn(row, PartialSum=counter)
            final_iterator.append(newRow)
        return final_iterator
    ## get tail of partiton with index
    def getTailWithIndex(index, iter): 
        tailRow = None
        for row in iter:
            tailRow = row
        return (index, tailRow["PartialSum"])
    ## Calculate sum map for each partition
    def calcSumMap(collectedMap):
        final_iterator = {}
        for index, value in enumerate(collectedMap):
            newVal = value
            for i in range(0, index):
                newVal += collectedMap[i]
            final_iterator[index] = newVal
        return final_iterator
    ## Calculate global cumulative sum
    def calcCumulativeSum(index, iter):
        final_iterator = []
        for row in iter:
            newVal = row["PartialSum"] + sumMap.value[index]
            final_iterator.append(addRowColumn(row, EventNumber=newVal))
        return final_iterator
    ## Register udf functions
    invCondUdf = udf(InvCond, IntegerType())
    mapEventNumberUdf = udf(mapEventNumber, StringType())
    ## Invert ConditionMet column
    rdd = df.withColumn("InvCond", invCondUdf(col(condColumn))).rdd
    ## Calculate partial cumulative sum over each partition
    rdd = rdd.mapPartitions(lambda iter: calcPartialCumulativeSum(iter)).cache()
    ## Calculate max sum value for each partition
    collctedMap = rdd.mapPartitionsWithIndex(getTailWithIndex).collect()
    sumMap = spark.sparkContext.broadcast(calcSumMap(collctedMap))
    ## Calculate global cumulative sum 
    df = rdd.mapPartitionsWithIndex(calcCumulativeSum).toDF()
    ## Append `Event` before each cluster number and ignore the rest
    df = df.withColumn(tagColumnName, mapEventNumberUdf(col(condColumn), col("EventNumber")))
    return df.drop(col("EventNumber")).drop(col("InvCond")).drop(col("PartialSum"))
## Read data
df = spark.read.csv("/home/eslam-elbanna/data.csv", header=True)    
## Tag sequnetial clusters
df = tagSequentialClusters(df, "ConditionMet", "EventName")
df.show()
from pyspark.sql import functions as F
df.withColumn('EventName', F.when(df["ConditionMet"]>0.0, "Event1").otherwise("")

我相信我找到了解决问题的方法,尽管我不知道它有多优雅。本质上,我通过上一记录的时间戳加入了数据集(我认为LEAD()函数可以帮助我。那就是我递增EventName创建中使用的全局计数器。

代码。
from pyspark import *
from pyspark.sql import SQLContext
from pyspark.sql.types import *
import pyspark.sql.functions as F
from pyspark.sql.functions import lit, col, expr, when
from pyspark.sql.types import StringType
from pyspark.sql.functions import udf, array
cnt=0
def tagTrip(t):
     global cnt
     if t[0]==1:
         if t[1]==0:
             cnt=cnt+1
         return "Event" + str(cnt)
     else:
        return ""
tagTripUdf = udf(lambda arr: tagTrip(arr), StringType())
# I don't include the code that shows how I got to this df and how I can join it onto itself
dfJoined = df.join(dfNext, df['time'] == dfNext['timeprev'], 'inner')
dfNew=dfJoined.withColumn('EventName',tagTripUdf(array(col('ConditionMet'),col('ConditionMetNext'))))

相关内容

  • 没有找到相关文章