Pyspark函数本身可以正常工作,但在封装在UDF中时不执行任务



我有这个函数,它接受代码并检查代码是否被使用(即:在used_codes字典中)。如果它没有被使用过,那么它就会输出相同的代码;如果它被使用过,那么它就会生成新的代码。然后创建一个新的df,列"code_id">

我的函数本身工作正常,但当它通过udf时,它不执行任务。我的used_codes字典是空的,即使我有大量的重复代码,应该添加到使用,然后替换。

我不确定为什么它在UDF中包装之前工作,而不是作为UDF运行时。

from pyspark.sql.types import StructType,StructField, StringType, IntegerType
import pyspark.sql.functions as F
import pyspark.sql.types as T
import random
data = [("James", "36636"),
("Michael", "36636"),
("Robert", "42114"),
("Maria", "39192"),
("Jen", "39192")
]
schema = StructType([ 
StructField("firstname",StringType(),True), 
StructField("id", StringType(), True), 
])
df = spark.createDataFrame(data=data,schema=schema)
used_codes = {}
def generate_random_code():
random_number = random.randint(10000,90000)
return random_number
def get_valid_code(code):
global used_codes
if(code != "" and code not in used_codes.keys()):
used_codes[code] = 1 
return code
new_code = generate_random_code()
while (new_code in used_codes.keys()):
new_code = generate_random_code() 
used_codes[new_code] = 2
return new_code
get_valid_code_udf = F.udf(lambda code: get_valid_code(code), T.StringType())
df = spark.createDataFrame(data=data,schema=schema)
new_df = df.withColumn("code_id", get_valid_code_udf('id'))
df.show()
+---------+-----+                                                               
|firstname|   id|
+---------+-----+
|    James|36636|
|  Michael|36636|
|   Robert|42114|
|    Maria|39192|
|      Jen|39192|
+---------+-----+
>>> new_df.show()
+---------+-----+-------+
|firstname|   id|code_id|
+---------+-----+-------+
|    James|36636|  36636|
|  Michael|36636|  63312|
|   Robert|42114|  42114|
|    Maria|39192|  39192|
|      Jen|39192|  76399|
+---------+-----+-------+

在函数中使用全局变量used_codes。这个全局变量存在于工作器中,这可能就是为什么您的函数没有像UDF那样工作,即使它仍然

相关内容

  • 没有找到相关文章

最新更新