作为一个简化的例子,我有一个列为"col1,col2"的数据帧"df",我想在对每列应用一个函数后计算一个行最大值:
def f(x):
return (x+1)
max_udf=udf(lambda x,y: max(x,y), IntegerType())
f_udf=udf(f, IntegerType())
df2=df.withColumn("result", max_udf(f_udf(df.col1),f_udf(df.col2)))
因此,如果df:
col1 col2
1 2
3 0
然后
df2:
col1 col2 result
1 2 3
3 0 4
以上似乎不起作用,并产生"无法评估表达式:PythonUDF#f…"
我绝对肯定"f_udf"在我的表上运行得很好,主要问题是max_udf。
在不创建额外列或使用基本map/reduce的情况下,有没有一种方法可以完全使用数据帧和udf来完成上述操作?我应该如何修改"max_udf"?
我也试过:
max_udf=udf(max, IntegerType())
这产生相同的误差。
我还确认了以下工作:
df2=(df.withColumn("temp1", f_udf(df.col1))
.withColumn("temp2", f_udf(df.col2))
df2=df2.withColumn("result", max_udf(df2.temp1,df2.temp2))
为什么我不能一次完成这些?
我希望看到一个可以推广到任何函数"f_udf"one_answers"max_udf"的答案。
我也遇到了类似的问题,并在这个stackoverflow问题的答案中找到了解决方案
要将多列或整行传递给UDF,请使用结构:
from pyspark.sql.functions import udf, struct
from pyspark.sql.types import IntegerType
df = sqlContext.createDataFrame([(None, None), (1, None), (None, 2)], ("a", "b"))
count_empty_columns = udf(lambda row: len([x for x in row if x == None]), IntegerType())
new_df = df.withColumn("null_count", count_empty_columns(struct([df[x] for x in df.columns])))
new_df.show()
退货:
+----+----+----------+
| a| b|null_count|
+----+----+----------+
|null|null| 2|
| 1|null| 1|
|null| 2| 1|
+----+----+----------+
UserDefinedFunction在接受UDF作为参数时抛出错误。
您可以像下面这样修改max_udf以使其工作。
df = sc.parallelize([(1, 2), (3, 0)]).toDF(["col1", "col2"])
max_udf = udf(lambda x, y: max(x + 1, y + 1), IntegerType())
df2 = df.withColumn("result", max_udf(df.col1, df.col2))
或
def f_udf(x):
return (x + 1)
max_udf = udf(lambda x, y: max(x, y), IntegerType())
## f_udf=udf(f, IntegerType())
df2 = df.withColumn("result", max_udf(f_udf(df.col1), f_udf(df.col2)))
注意:
当且仅当内部函数(此处为f_udf
)生成有效的SQL表达式时,第二种方法才有效。
它在这里起作用,因为在传递给max_udf
之前,f_udf(df.col1)
和f_udf(df.col2)
分别被评估为Column<b'(col1 + 1)'>
和Column<b'(col2 + 1)'>
。它不适用于任意函数。
例如,如果我们尝试这样的东西,它将不起作用:
from math import exp
df.withColumn("result", max_udf(exp(df.col1), exp(df.col2)))
处理此问题的最佳方法是转义pyspark.sql.DataFrame表示并使用pyspark。经由CCD_ 7和CCD_。
import typing
# Save yourself some pain and always import these things: functions as F and types as T
import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql import Row, SparkSession, SQLContext
spark = (
SparkSession.builder.appName("Stack Overflow Example")
.getOrCreate()
)
sc = spark.sparkContext
# sqlContet is needed sometimes to create DataFrames from RDDs
sqlContext = SQLContext(sc)
df = sc.parallelize([Row(**{"a": "hello", "b": 1, "c": 2}), Row(**{"a": "goodbye", "b": 2, "c": 1})]).toDF(["a", "b", "c"])
def to_string(record:dict) -> Row:
"""Create a readable string representation of the record"""
record["readable"] = f'Word: {record["a"]} A: {record["b"]} B: {record["c"]}'
return Row(**record)
# Apply the function with a map after converting the Row to a dict
readable_rdd = df.rdd.map(lambda x: x.asDict()).map(to_string)
# Test the function without running the entire DataFrame through it
print(readable_rdd.first())
# This results in: Row(a='hello', b=1, c=2, readable='Word: hello A: 1 B: 2')
# Sometimes you can use `toDF()` to get a dataframe
readable_df = readable_rdd.toDF()
readable_df.show()
# +-------+---+---+--------------------+
# | a| b| c| readable|
# +-------+---+---+--------------------+
# | hello| 1| 2|Word: hello A: 1 ...|
# |goodbye| 2| 1|Word: goodbye A: ...|
# +-------+---+---+--------------------+
# Sometimes you have to use createDataFrame with a specified schema
schema = T.StructType(
[
T.StructField("a", T.StringType(), True),
T.StructField("b", T.IntegerType(), True),
T.StructField("c", T.StringType(), True),
T.StructField("readable", T.StringType(), True),
]
)
# This is more reliable, you should use it in production!
readable_df = sqlContext.createDataFrame(readable_rdd, schema)
readable_df.show()
# +-------+---+---+--------------------+
# | a| b| c| readable|
# +-------+---+---+--------------------+
# | hello| 1| 2|Word: hello A: 1 ...|
# |goodbye| 2| 1|Word: goodbye A: ...|
# +-------+---+---+--------------------+
有时RDD.map()
函数不能使用某些Python库,因为映射器是序列化的,因此您需要将数据划分为足够的分区来占据集群的所有核心,然后使用pyspark.RDD.mapPartition()
一次处理整个分区(只是dicts的Iterable)。这使您能够实例化一个昂贵的对象一次,就像spaCy语言模型一样,并一次将其应用于一个记录,而无需重新创建它
def to_string_partition(partition:typing.Iterable[dict]) -> typing.Iterable[Row]:
"""Add a readable string form to an entire partition"""
# Instantiate expensive objects here
# Apply these objects' methods here
for record in partition:
record["readable"] = f'Word: {record["a"]} A: {record["b"]} B: {record["c"]}'
yield Row(**record)
readable_rdd = df.rdd.map(lambda x: x.asDict()).mapPartitions(to_string_partition)
print(readable_rdd.first())
# Row(a='hello', b=1, c=2, readable='Word: hello A: 1 B: 2')
# mapPartitions are more likely to require a specified schema
schema = T.StructType(
[
T.StructField("a", T.StringType(), True),
T.StructField("b", T.IntegerType(), True),
T.StructField("c", T.StringType(), True),
T.StructField("readable", T.StringType(), True),
]
)
# This is more reliable, you should use it in production!
readable_df = sqlContext.createDataFrame(readable_rdd, schema)
readable_df.show()
# +-------+---+---+--------------------+
# | a| b| c| readable|
# +-------+---+---+--------------------+
# | hello| 1| 2|Word: hello A: 1 ...|
# |goodbye| 2| 1|Word: goodbye A: ...|
# +-------+---+---+--------------------+
DataFrame API很好,因为它们允许类似SQL的操作更快,但有时您需要没有任何限制的直接Python的功能,学习使用RDD将极大地有利于您的分析实践。例如,您可以对记录进行分组,然后在RAM中评估整个组,只要它合适——您可以通过更改分区键和限制工作人员/增加他们的RAM来安排。
import numpy as np
def median_b(x):
"""Process a group and determine the median value"""
key = x[0]
values = x[1]
# Get the median value
m = np.median([record["b"] for record in values])
# Return a Row of the median for each group
return Row(**{"a": key, "median_b": m})
median_b_rdd = df.rdd.map(lambda x: x.asDict()).groupBy(lambda x: x["a"]).map(median_b)
median_b_rdd.first()
# Row(a='hello', median_b=1.0)
下面是一段有用的代码,专门用于通过简单地调用顶级业务规则来创建任何新列,与技术和繁重的Spark的东西完全隔离(不再需要花费美元,也不再需要依赖Databricks库)。我的建议是,在您的组织中,为了顶级数据用户的利益,尽量在生活中简单干净地做事:
def createColumnFromRule(df, columnName, ruleClass, ruleName, inputColumns=None, inputValues=None, columnType=None):
from pyspark.sql import functions as F
from pyspark.sql import types as T
def _getSparkClassType(shortType):
defaultSparkClassType = "StringType"
typesMapping = {
"bigint" : "LongType",
"binary" : "BinaryType",
"boolean" : "BooleanType",
"byte" : "ByteType",
"date" : "DateType",
"decimal" : "DecimalType",
"double" : "DoubleType",
"float" : "FloatType",
"int" : "IntegerType",
"integer" : "IntegerType",
"long" : "LongType",
"numeric" : "NumericType",
"string" : defaultSparkClassType,
"timestamp" : "TimestampType"
}
sparkClassType = None
try:
sparkClassType = typesMapping[shortType]
except:
sparkClassType = defaultSparkClassType
return sparkClassType
if (columnType != None): sparkClassType = _getSparkClassType(columnType)
else: sparkClassType = "StringType"
aUdf = eval("F.udf(ruleClass." + ruleName + ", T." + sparkClassType + "())")
columns = None
values = None
if (inputColumns != None): columns = F.struct([df[column] for column in inputColumns])
if (inputValues != None): values = F.struct([F.lit(value) for value in inputValues])
# Call the rule
if (inputColumns != None and inputValues != None): df = df.withColumn(columnName, aUdf(columns, values))
elif (inputColumns != None): df = df.withColumn(columnName, aUdf(columns, F.lit(None)))
elif (inputValues != None): df = df.withColumn(columnName, aUdf(F.lit(None), values))
# Create a Null column otherwise
else:
if (columnType != None):
df = df.withColumn(columnName, F.lit(None).cast(columnType))
else:
df = df.withColumn(columnName, F.lit(None))
# Return the resulting dataframe
return df
用法示例:
# Define your business rule (you can get columns and values)
class CustomerRisk:
def churnRisk(self, columns=None, values=None):
isChurnRisk = False
# ... Rule implementation starts here
if (values != None):
if (values[0] == "FORCE_CHURN=true"): isChurnRisk = True
if (isChurnRisk == False and columns != None):
if (columns["AGE"]) <= 25): isChurnRisk = True
# ...
return isChurnRisk
# Execute the rule, it will create your new column in one line of code, that's all, easy isn't ?
# And look how to pass columns and values, it's really easy !
df = createColumnFromRule(df, columnName="CHURN_RISK", ruleClass=CustomerRisk(), ruleName="churnRisk", columnType="boolean", inputColumns=["NAME", "AGE", "ADDRESS"], inputValues=["FORCE_CHURN=true", "CHURN_RISK=100%"])