我有一个客户表,可为每个客户提供有关几个过程的信息。
目标是为每个客户和每个过程提取功能。这意味着每个功能主要是.groupby(customerID, processID)
对象上的汇总或分类计算。
但是,目标是能够随着时间的推移添加越来越多的功能。因此,基本上,用户应该能够通过某些过滤器,指标和聚合来定义新功能,并将此新功能添加到在表上运行的功能池中。
输出应为customerId,ProcessID表,并具有所有功能。
所以我开始了一个最小的工作示例:
l = [('CM1','aa1', 100,0.1),('CM1','aa1', 110,0.2),
('CM1','aa1', 110,0.9),('CM1','aa1', 100,1.5),
('CX2','bb9', 100,0.1),('CX2','bb9', 100,0.2),
('CX2','bb9', 110,6.0),('CX2','bb9', 100,0.18)]
rdd = sc.parallelize(l)
df = sqlContext.createDataFrame(rdd,['customid','procid','speed','timestamp'])
+--------+------+-----+---------+
|customid|procid|speed|timestamp|
+--------+------+-----+---------+
| CM1| aa1| 100| 0.1|
| CM1| aa1| 110| 0.2|
| CM1| aa1| 110| 0.9|
| CM1| aa1| 100| 1.5|
| CX2| bb9| 100| 0.1|
| CX2| bb9| 100| 0.2|
| CX2| bb9| 110| 6.0|
| CX2| bb9| 100| 0.18|
+--------+------+-----+---------+
然后我定义了2个任意功能,这些功能被这些功能提取:
def extr_ft_1 (proc_data, limit=100):
proc_data = proc_data.filter(proc_data.speed > limit).agg(count(proc_data.speed))
proc_data = proc_data.select(col('count(speed)').alias('speed_feature'))
proc_data.show()
return proc_data
def extr_ft_0 (proc_data):
max_t = proc_data.agg(spark_max(proc_data.timestamp))
min_t = proc_data.agg(spark_min(proc_data.timestamp))
max_t = max_t.select(col('max(timestamp)').alias('max'))
min_t = min_t.select(col('min(timestamp)').alias('min'))
X = max_t.crossJoin(min_t)
X = X.withColumn('time_feature', X.max+X.min)
X = X.drop(X.min).drop(X.max)
X.show()
return (X)
他们返回只有总值的1元素RRD。接下来,将所有特征功能应用于给定的过程,并在每个过程的结果RDD中合并:
def get_proc_features(proc, data, *features):
proc_data = data.filter( data.customid == proc)
features_for_proc = [feature_value(proc_data) for feature_value in features]
for number, feature in enumerate(features_for_proc):
if number == 0:
l = [(proc,'dummy')]
rdd = sc.parallelize(l)
df = sqlContext.createDataFrame(rdd,['customid','dummy'])
df = df.drop(df.dummy)
df.show()
features_for_proc_rdd = feature
features_for_proc_rdd = features_for_proc_rdd.crossJoin(df)
continue
features_for_proc_rdd = features_for_proc_rdd.crossJoin(feature)
features_for_proc_rdd.show()
return features_for_proc_rdd
他们最后一步是将每个过程包含功能包含功能的所有行附加到一个数据框中:
for number, proc in enumerate(customer_list_1):
if number == 0:
#results = get_trip_features(trip, df, extr_ft_0, extr_ft_1)
results = get_proc_features(proc, df, *extr_feature_funcs)
continue
results = results.unionAll(get_proc_features(proc, df, *extr_feature_funcs))
results.show()
转换链如下:
获得客户1和2的功能1:
+------------+
|time_feature|
+------------+
| 1.6|
+------------+
+-------------+
|speed_feature|
+-------------+
| 2|
+-------------+
将它们结合到:
+------------+--------+-------------+
|time_feature|customid|speed_feature|
+------------+--------+-------------+
| 1.6| CM1| 2|
+------------+--------+-------------+
为客户2做同样的事情,并将所有RDD附加到最终结果RDD:
+------------+--------+-------------+
|time_feature|customid|speed_feature|
+------------+--------+-------------+
| 1.6| CM1| 2|
| 6.1| CX2| 1|
+------------+--------+-------------+
如果我在群集上运行代码,则适用于2个客户。但是,当我以合理数量的客户进行测试时,我会出现GC和堆错误错误。
我在这里与许多RDD合作吗?恐怕我的代码效率很低,但我不知道从哪里开始优化它。我想我只是在最后一个动作(我将所有show()放在现场模式下,然后收集()最后一个rdd)。我真的很感谢您的帮助。
您的代码需要重构,问题不是RDD,而是将其过滤以在单一键上工作,然后交叉加入。通过值迭代会使您失去Pyspark的分布式方面。请记住,如果您不需要另一个功能,则应始终保留一个工作表。
最好的方法是使用dataframes和窗口功能。
首先,让我们重写您的功能:
import pyspark.sql.functions as psf
def extr_ft_1 (proc_data, w, limit=100):
return proc_data.withColumn(
"speed_feature",
psf.sum((proc_data.speed > limit).cast("int")).over(w)
)
def extr_ft_0(proc_data, w):
return proc_data.withColumn(
"time_feature",
psf.min(proc_data.timestamp).over(w) + psf.max(proc_data.timestamp).over(w)
)
其中 w
是窗口规格:
from pyspark.sql import Window
w = Window.partitionBy("customid")
df1 = extr_ft_1(df, w)
df0 = extr_ft_0(df1, w)
df0.show()
+--------+------+-----+---------+-------------+------------+
|customid|procid|speed|timestamp|speed_feature|time_feature|
+--------+------+-----+---------+-------------+------------+
| CM1| aa1| 100| 0.1| 2| 1.6|
| CM1| aa1| 110| 0.2| 2| 1.6|
| CM1| aa1| 110| 0.9| 2| 1.6|
| CM1| aa1| 100| 1.5| 2| 1.6|
| CX2| bb9| 100| 0.1| 1| 6.1|
| CX2| bb9| 100| 0.2| 1| 6.1|
| CX2| bb9| 110| 6.0| 1| 6.1|
| CX2| bb9| 100| 0.18| 1| 6.1|
+--------+------+-----+---------+-------------+------------+
在这里,我们永远不会丢失信息(我们保留所有行),因此,如果您想添加额外的功能。如果您想要最终的汇总结果,只需通过groupBy("customid")
运行。
请注意,您还可以修改窗口规范中的聚合密钥以包括procid
。