我有一个我一直在努力的问题,我必须想象有一个更优雅的解决方案,而不是通过行循环循环循环。我有一个数据框,如下:
EventTime | ConditionMet
--------- --------- | --------
2017-09-11 00:00:01 | 0
2017-09-11 00:00:02 | 0
2017-09-11 00:00:03 | 0
2017-09-11 00:00:04 | 1
2017-09-11 00:00:05 | 1
2017-09-11 00:00:06 | 1
2017-09-11 00:00:07 | 0
2017-09-11 00:00:08 | 0
2017-09-11 00:00:09 | 1
2017-09-11 00:00:10 | 1
2017-09-11 00:00:11 | 1
2017-09-11 00:00:12 | 1
2017-09-11 00:00:13 | 0
每次满足条件时(ConditionMet=1
),我想用事件名称(例如Event1)标记记录。我找不到使用.withColumn()
条件或窗口使用CC_2的优雅方法。理想的结果是:
EventTime |ConditionMet|EventName
----------------- - | ---------- | --------
2017-09-11 00:00:01 | 0 |
2017-09-11 00:00:02 | 0 |
2017-09-11 00:00:03 | 0 |
2017-09-11 00:00:04 | 1 | Event1
2017-09-11 00:00:05 | 1 | Event1
2017-09-11 00:00:06 | 1 | Event1
2017-09-11 00:00:07 | 0 |
2017-09-11 00:00:08 | 0 |
2017-09-11 00:00:09 | 1 | Event2
2017-09-11 00:00:10 | 1 | Event2
2017-09-11 00:00:11 | 1 | Event2
2017-09-11 00:00:12 | 1 | Event2
2017-09-11 00:00:13 | 0 |
对这里的任何聪明方法感兴趣。
我认为,如果想法是用唯一标签标记每个顺序群集,那么您可以通过计算累积总和
来做到这一点- 排序DF,例如 在
- 反转
ConditionMet
- 计算倒置列的累积总和
忽略
的群集的标签ConditionMet
= 0的累积总和,并将累积总和作为ConditionMet
= 1+-------------------+------------+-------+-------------+---------+ | EventTime|ConditionMet|InvCond|CumulativeSum|EventName| +-------------------+------------+-------+-------------+---------+ |2017-09-11 00:00:01| 0| 1| 1| | |2017-09-11 00:00:02| 0| 1| 2| | |2017-09-11 00:00:03| 0| 1| 3| | |2017-09-11 00:00:04| 1| 0| 3| Event3| |2017-09-11 00:00:05| 1| 0| 3| Event3| |2017-09-11 00:00:06| 1| 0| 3| Event3| |2017-09-11 00:00:07| 0| 1| 4| | |2017-09-11 00:00:08| 0| 1| 5| | |2017-09-11 00:00:09| 1| 0| 5| Event5| |2017-09-11 00:00:10| 1| 0| 5| Event5| |2017-09-11 00:00:11| 1| 0| 5| Event5| |2017-09-11 00:00:12| 1| 0| 5| Event5| |2017-09-11 00:00:13| 0| 1| 6| | +-------------------+------------+-------+-------------+---------+
EventTime
上代码
from pyspark.sql.functions import lag, udf, col
from pyspark.sql import Row
from pyspark.sql.types import StringType, IntegerType
from pyspark.sql.window import Window
def tagSequentialClusters(df, condColumn, tagColumnName):
## Invert condition 0 will be 1 and 1 will be 0
def InvCond(value):
if value == '1':
return 0
else:
return 1
## Add Event for the valid clusters
def mapEventNumber(cond, number):
if cond == "1":
return "Event" + str(number)
else:
return ""
## Add new columns to row
def addRowColumn(row, **kwargs):
rowData = row.asDict()
for column in kwargs:
rowData[column] = kwargs[column]
return Row(**rowData)
## Calculate partial cumulative sum for partition iterator
def calcPartialCumulativeSum(iter):
counter = 0
final_iterator = []
for row in iter:
counter = counter + row["InvCond"]
newRow = addRowColumn(row, PartialSum=counter)
final_iterator.append(newRow)
return final_iterator
## get tail of partiton with index
def getTailWithIndex(index, iter):
tailRow = None
for row in iter:
tailRow = row
return (index, tailRow["PartialSum"])
## Calculate sum map for each partition
def calcSumMap(collectedMap):
final_iterator = {}
for index, value in enumerate(collectedMap):
newVal = value
for i in range(0, index):
newVal += collectedMap[i]
final_iterator[index] = newVal
return final_iterator
## Calculate global cumulative sum
def calcCumulativeSum(index, iter):
final_iterator = []
for row in iter:
newVal = row["PartialSum"] + sumMap.value[index]
final_iterator.append(addRowColumn(row, EventNumber=newVal))
return final_iterator
## Register udf functions
invCondUdf = udf(InvCond, IntegerType())
mapEventNumberUdf = udf(mapEventNumber, StringType())
## Invert ConditionMet column
rdd = df.withColumn("InvCond", invCondUdf(col(condColumn))).rdd
## Calculate partial cumulative sum over each partition
rdd = rdd.mapPartitions(lambda iter: calcPartialCumulativeSum(iter)).cache()
## Calculate max sum value for each partition
collctedMap = rdd.mapPartitionsWithIndex(getTailWithIndex).collect()
sumMap = spark.sparkContext.broadcast(calcSumMap(collctedMap))
## Calculate global cumulative sum
df = rdd.mapPartitionsWithIndex(calcCumulativeSum).toDF()
## Append `Event` before each cluster number and ignore the rest
df = df.withColumn(tagColumnName, mapEventNumberUdf(col(condColumn), col("EventNumber")))
return df.drop(col("EventNumber")).drop(col("InvCond")).drop(col("PartialSum"))
## Read data
df = spark.read.csv("/home/eslam-elbanna/data.csv", header=True)
## Tag sequnetial clusters
df = tagSequentialClusters(df, "ConditionMet", "EventName")
df.show()
from pyspark.sql import functions as F
df.withColumn('EventName', F.when(df["ConditionMet"]>0.0, "Event1").otherwise("")
我相信我找到了解决问题的方法,尽管我不知道它有多优雅。本质上,我通过上一记录的时间戳加入了数据集(我认为LEAD()函数可以帮助我。那就是我递增EventName创建中使用的全局计数器。
代码。from pyspark import *
from pyspark.sql import SQLContext
from pyspark.sql.types import *
import pyspark.sql.functions as F
from pyspark.sql.functions import lit, col, expr, when
from pyspark.sql.types import StringType
from pyspark.sql.functions import udf, array
cnt=0
def tagTrip(t):
global cnt
if t[0]==1:
if t[1]==0:
cnt=cnt+1
return "Event" + str(cnt)
else:
return ""
tagTripUdf = udf(lambda arr: tagTrip(arr), StringType())
# I don't include the code that shows how I got to this df and how I can join it onto itself
dfJoined = df.join(dfNext, df['time'] == dfNext['timeprev'], 'inner')
dfNew=dfJoined.withColumn('EventName',tagTripUdf(array(col('ConditionMet'),col('ConditionMetNext'))))