用于从py spark中的业务规则创建要执行的可执行文件的框架



我有一个数据帧df,其示例值如下。

from pyspark.sql.types import DateType, LongType, StringType, StructType, StructField,BooleanType
import os
import pyspark.sql.functions as F
import datetime
from pyspark.sql import DataFrame
from pyspark.sql.types import StringType,IntegerType,ArrayType
from pyspark.sql import Row
l = [('test',1,0,1,0),('prod',0,1,0,1),('local',1,0,1,0)]
rdd = sc.parallelize(l)
sdf = rdd.map(lambda x: Row(col1=x[0], col2=int(x[1]),col3=int(x[2]),col4=int(x[3]),col5=int(x[4])))
df = sqlContext.createDataFrame(sdf)
-----+----+----+----+----+
| col1|col2|col3|col4|col5|
+-----+----+----+----+----+
| test|   1|   0|   1|   0|
| prod|   0|   1|   0|   1|
|local|   1|   0|   1|   0|
+-----+----+----+----+----+

还有一些业务规则如下。到目前为止,它被保存为字典中的元数据。(但是,规则元数据可以保存为:agg_level、agg_function、transformation、source、source_column(

features = {
"col6": F.when(F.col('col2') > 0,F.lit(1)).otherwise(F.lit(0)),
"col7": F.when(F.col('col3') > 0, F.lit(1)).otherwise(F.lit(0)),
"col8": F.when(F.col('col4') > 0, F.lit(1)).otherwise(F.lit(0)),
"col9": F.when(F.col('col5') > 0, F.lit(1)).otherwise(F.lit(0))
}

我想创建一个函数,比如df_extract((,它动态生成可执行代码,如下所示。这应该返回以下要执行的查询(而不是作为数据帧(

df1 = df_extract(df,col6,col7,col8,col9)
df1 = **df.filter('col1 = "test"') 
.withColumn('col6', F.when(F.col('col2') > 0, F.lit(1)).otherwise(F.lit(0))) 
.withColumn('col7', F.when(F.col('col3') > 0, F.lit(1)).otherwise(F.lit(0))) 
.withColumn('col8', F.when(F.col('col4') > 0, F.lit(1)).otherwise(F.lit(0))) 
.withColumn('col9', F.when(F.col('col5') > 0, F.lit(1)).otherwise(F.lit(0)))** 

当使用三个功能调用时,返回的查询中只应存在三个功能,依此类推

df1 = df_extract(df,col6,col7,col8) 
df1 = **df.filter('col1 = "test"') 
.withColumn('col6', F.when(F.col('col2') > 0, F.lit(1)).otherwise(F.lit(0))) 
.withColumn('col7', F.when(F.col('col3') > 0, F.lit(1)).otherwise(F.lit(0))) 
.withColumn('col8', F.when(F.col('col4') > 0, F.lit(1)).otherwise(F.lit(0)))**

最后,如果没有任何特性,则所有特性都应该存在于表达式/查询中。

df1 = df_extract(df)
df1 = **df.filter('col1 = "test"') 
.withColumn('col6', F.when(F.col('col2') > 0, F.lit(1)).otherwise(F.lit(0))) 
.withColumn('col7', F.when(F.col('col3') > 0, F.lit(1)).otherwise(F.lit(0))) 
.withColumn('col8', F.when(F.col('col4') > 0, F.lit(1)).otherwise(F.lit(0))) 
.withColumn('col9', F.when(F.col('col5') > 0, F.lit(1)).otherwise(F.lit(0)))**

无论如何,至少通过在pyspark中创建sql表,这是可能的吗。N个这样的转换规则将与每个数据帧相关联,并且函数应该能够动态返回定义。

厌倦了思考解决方案。

我认为的一个解决方案是使用规则作为案例条件。

journey_features = {
"Rules":{
"col6": "case when col2 > 0 then 1 else 0 end as col6",
"col7": "case when col3 > 0 then 1 else 0 end as col7",
"col8": "case when col4 > 0 then 1 else 0 end as col8",
"col9": "case when col5 > 0 then 1 else 0 end as col9"
},
"filter":"col1 == 'test'"
}

extract_feature((函数如下所示,用于将规则用作表达式。

def extract_feature(df : DataFrame,*featurenames):
retrieved_features = ""
for featurename in featurenames:
if featurename  in journey_features.get('Rules'):
retrieved_features += "'" + str(journey_features.get('Rules')[featurename]) +"'" + ","
retrieved_features = retrieved_features.rstrip(',')
if journey_features['filter']:
filter_feature = ".filter({df}.".format(df=df) + str(journey_features['filter']) + ")"
else:
filte_feature = ""
return "{0}{1}.selectExpr({2})".format(df,filter_feature,retrieved_features)

并将df和pass,features传递给函数。

extract_feature('df','col6','col7')

结果是

Out[139]: "df.filter(df.measurement_group == 'test').selectExpr('case when col2 > 0 then 1 else 0 end as col6','case when col3 > 0 then 1 else 0 end as col7')"

可以将评估函数分配给数据帧

df1 = eval(extract_feature('df','col6','col7'))

最新更新