PySpark逐行函数组合



作为一个简化的例子,我有一个列为"col1,col2"的数据帧"df",我想在对每列应用一个函数后计算一个行最大值:

def f(x):
    return (x+1)
max_udf=udf(lambda x,y: max(x,y), IntegerType())
f_udf=udf(f, IntegerType())
df2=df.withColumn("result", max_udf(f_udf(df.col1),f_udf(df.col2)))

因此,如果df:

col1   col2
1      2
3      0

然后

df2:

col1   col2  result
1      2     3
3      0     4

以上似乎不起作用,并产生"无法评估表达式:PythonUDF#f…"

我绝对肯定"f_udf"在我的表上运行得很好,主要问题是max_udf。

在不创建额外列或使用基本map/reduce的情况下,有没有一种方法可以完全使用数据帧和udf来完成上述操作?我应该如何修改"max_udf"?

我也试过:

max_udf=udf(max, IntegerType())

这产生相同的误差。

我还确认了以下工作:

df2=(df.withColumn("temp1", f_udf(df.col1))
       .withColumn("temp2", f_udf(df.col2))
df2=df2.withColumn("result", max_udf(df2.temp1,df2.temp2))

为什么我不能一次完成这些?

我希望看到一个可以推广到任何函数"f_udf"one_answers"max_udf"的答案。

我也遇到了类似的问题,并在这个stackoverflow问题的答案中找到了解决方案

要将多列或整行传递给UDF,请使用结构:

from pyspark.sql.functions import udf, struct
from pyspark.sql.types import IntegerType
df = sqlContext.createDataFrame([(None, None), (1, None), (None, 2)], ("a", "b"))
count_empty_columns = udf(lambda row: len([x for x in row if x == None]), IntegerType())
new_df = df.withColumn("null_count", count_empty_columns(struct([df[x] for x in df.columns])))
new_df.show()

退货:

+----+----+----------+
|   a|   b|null_count|
+----+----+----------+
|null|null|         2|
|   1|null|         1|
|null|   2|         1|
+----+----+----------+

UserDefinedFunction在接受UDF作为参数时抛出错误。

您可以像下面这样修改max_udf以使其工作。

df = sc.parallelize([(1, 2), (3, 0)]).toDF(["col1", "col2"])
max_udf = udf(lambda x, y: max(x + 1, y + 1), IntegerType())
df2 = df.withColumn("result", max_udf(df.col1, df.col2))

def f_udf(x):
    return (x + 1)
max_udf = udf(lambda x, y: max(x, y), IntegerType())
## f_udf=udf(f, IntegerType())
df2 = df.withColumn("result", max_udf(f_udf(df.col1), f_udf(df.col2)))

注意

当且仅当内部函数(此处为f_udf)生成有效的SQL表达式时,第二种方法才有效。

它在这里起作用,因为在传递给max_udf之前,f_udf(df.col1)f_udf(df.col2)分别被评估为Column<b'(col1 + 1)'>Column<b'(col2 + 1)'>。它不适用于任意函数。

例如,如果我们尝试这样的东西,它将不起作用:

from math import exp
df.withColumn("result", max_udf(exp(df.col1), exp(df.col2)))

处理此问题的最佳方法是转义pyspark.sql.DataFrame表示并使用pyspark。经由CCD_ 7和CCD_。

import typing
# Save yourself some pain and always import these things: functions as F and types as T
import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql import Row, SparkSession, SQLContext

spark = (
    SparkSession.builder.appName("Stack Overflow Example")
    .getOrCreate()
)
sc = spark.sparkContext
# sqlContet is needed sometimes to create DataFrames from RDDs
sqlContext = SQLContext(sc)
df = sc.parallelize([Row(**{"a": "hello", "b": 1, "c": 2}), Row(**{"a": "goodbye", "b": 2, "c": 1})]).toDF(["a", "b", "c"])

def to_string(record:dict) -> Row:
    """Create a readable string representation of the record"""
    
    record["readable"] = f'Word: {record["a"]} A: {record["b"]} B: {record["c"]}'
    return Row(**record)

# Apply the function with a map after converting the Row to a dict
readable_rdd = df.rdd.map(lambda x: x.asDict()).map(to_string)
# Test the function without running the entire DataFrame through it
print(readable_rdd.first())
# This results in: Row(a='hello', b=1, c=2, readable='Word: hello A: 1 B: 2')
# Sometimes you can use `toDF()` to get a dataframe
readable_df = readable_rdd.toDF()
readable_df.show()
# +-------+---+---+--------------------+
# |      a|  b|  c|            readable|
# +-------+---+---+--------------------+
# |  hello|  1|  2|Word: hello A: 1 ...|
# |goodbye|  2|  1|Word: goodbye A: ...|
# +-------+---+---+--------------------+
# Sometimes you have to use createDataFrame with a specified schema
schema = T.StructType(
    [
        T.StructField("a", T.StringType(), True),
        T.StructField("b", T.IntegerType(), True),
        T.StructField("c", T.StringType(), True),
        T.StructField("readable", T.StringType(), True),
    ]
)
# This is more reliable, you should use it in production!
readable_df = sqlContext.createDataFrame(readable_rdd, schema)
readable_df.show()
# +-------+---+---+--------------------+
# |      a|  b|  c|            readable|
# +-------+---+---+--------------------+
# |  hello|  1|  2|Word: hello A: 1 ...|
# |goodbye|  2|  1|Word: goodbye A: ...|
# +-------+---+---+--------------------+

有时RDD.map()函数不能使用某些Python库,因为映射器是序列化的,因此您需要将数据划分为足够的分区来占据集群的所有核心,然后使用pyspark.RDD.mapPartition()一次处理整个分区(只是dicts的Iterable)。这使您能够实例化一个昂贵的对象一次,就像spaCy语言模型一样,并一次将其应用于一个记录,而无需重新创建它

def to_string_partition(partition:typing.Iterable[dict]) -> typing.Iterable[Row]:
    """Add a readable string form to an entire partition"""
    # Instantiate expensive objects here
    
    # Apply these objects' methods here
    for record in partition:
        record["readable"] = f'Word: {record["a"]} A: {record["b"]} B: {record["c"]}'
        yield Row(**record)

readable_rdd = df.rdd.map(lambda x: x.asDict()).mapPartitions(to_string_partition)
print(readable_rdd.first())
# Row(a='hello', b=1, c=2, readable='Word: hello A: 1 B: 2')
# mapPartitions are more likely to require a specified schema
schema = T.StructType(
    [
        T.StructField("a", T.StringType(), True),
        T.StructField("b", T.IntegerType(), True),
        T.StructField("c", T.StringType(), True),
        T.StructField("readable", T.StringType(), True),
    ]
)
# This is more reliable, you should use it in production!
readable_df = sqlContext.createDataFrame(readable_rdd, schema)
readable_df.show()
# +-------+---+---+--------------------+
# |      a|  b|  c|            readable|
# +-------+---+---+--------------------+
# |  hello|  1|  2|Word: hello A: 1 ...|
# |goodbye|  2|  1|Word: goodbye A: ...|
# +-------+---+---+--------------------+

DataFrame API很好,因为它们允许类似SQL的操作更快,但有时您需要没有任何限制的直接Python的功能,学习使用RDD将极大地有利于您的分析实践。例如,您可以对记录进行分组,然后在RAM中评估整个组,只要它合适——您可以通过更改分区键和限制工作人员/增加他们的RAM来安排。

import numpy as np

def median_b(x):
    """Process a group and determine the median value"""
    
    key = x[0]
    values = x[1]
    
    # Get the median value
    m = np.median([record["b"] for record in values])
    # Return a Row of the median for each group
    return Row(**{"a": key, "median_b": m})

median_b_rdd = df.rdd.map(lambda x: x.asDict()).groupBy(lambda x: x["a"]).map(median_b)
median_b_rdd.first()
# Row(a='hello', median_b=1.0)

下面是一段有用的代码,专门用于通过简单地调用顶级业务规则来创建任何新列,与技术和繁重的Spark的东西完全隔离(不再需要花费美元,也不再需要依赖Databricks库)。我的建议是,在您的组织中,为了顶级数据用户的利益,尽量在生活中简单干净地做事:

def createColumnFromRule(df, columnName, ruleClass, ruleName, inputColumns=None, inputValues=None, columnType=None):
    from pyspark.sql import functions as F
    from pyspark.sql import types as T
    def _getSparkClassType(shortType):
        defaultSparkClassType = "StringType"
        typesMapping = {
            "bigint"    : "LongType",
            "binary"    : "BinaryType",
            "boolean"   : "BooleanType",
            "byte"      : "ByteType",
            "date"      : "DateType",
            "decimal"   : "DecimalType",
            "double"    : "DoubleType",
            "float"     : "FloatType",
            "int"       : "IntegerType",
            "integer"   : "IntegerType",
            "long"      : "LongType",
            "numeric"   : "NumericType",
            "string"    : defaultSparkClassType,
            "timestamp" : "TimestampType"
        }
        sparkClassType = None
        try:
            sparkClassType = typesMapping[shortType]
        except:
            sparkClassType = defaultSparkClassType
        return sparkClassType
    if (columnType != None): sparkClassType = _getSparkClassType(columnType)
    else: sparkClassType = "StringType"
    aUdf = eval("F.udf(ruleClass." + ruleName + ", T." + sparkClassType + "())")
    columns = None
    values = None
    if (inputColumns != None): columns = F.struct([df[column] for column in inputColumns])
    if (inputValues != None): values = F.struct([F.lit(value) for value in inputValues])
    # Call the rule
    if (inputColumns != None and inputValues != None): df = df.withColumn(columnName, aUdf(columns, values))
    elif (inputColumns != None): df = df.withColumn(columnName, aUdf(columns, F.lit(None)))
    elif (inputValues != None): df = df.withColumn(columnName, aUdf(F.lit(None), values))
    # Create a Null column otherwise
    else:
        if (columnType != None):
            df = df.withColumn(columnName, F.lit(None).cast(columnType))
        else:
            df = df.withColumn(columnName, F.lit(None))
    # Return the resulting dataframe
    return df

用法示例:

# Define your business rule (you can get columns and values)
class CustomerRisk:
    def churnRisk(self, columns=None, values=None):
        isChurnRisk = False
        # ... Rule implementation starts here
        if (values != None):
            if (values[0] == "FORCE_CHURN=true"): isChurnRisk = True
        if (isChurnRisk == False and columns != None):
            if (columns["AGE"]) <= 25): isChurnRisk = True
        # ...
        return isChurnRisk
# Execute the rule, it will create your new column in one line of code, that's all, easy isn't ?
# And look how to pass columns and values, it's really easy !
df = createColumnFromRule(df, columnName="CHURN_RISK", ruleClass=CustomerRisk(), ruleName="churnRisk", columnType="boolean", inputColumns=["NAME", "AGE", "ADDRESS"], inputValues=["FORCE_CHURN=true", "CHURN_RISK=100%"])

相关内容

  • 没有找到相关文章

最新更新