遍历pyspark数据框并将每个值发送到UDF



我有一个像下面这样的数据框架:

+---------+----------+--------+-----+------+------+----------------------------------------------------------------------------------------------------+
|firstname|middlename|lastname|id   |gender|salary|meta                                                                                                |
+---------+----------+--------+-----+------+------+----------------------------------------------------------------------------------------------------+
|James    |          |Smith   |36636|M     |3000  |{"firstname":"James","middlename":"","lastname":"Smith","id":"36636","gender":"M","salary":3000}    |
|Michael  |Rose      |        |40288|M     |4000  |{"firstname":"Michael","middlename":"Rose","lastname":"","id":"40288","gender":"M","salary":4000}   |
|Robert   |          |Williams|42114|M     |4000  |{"firstname":"Robert","middlename":"","lastname":"Williams","id":"42114","gender":"M","salary":4000}|
|Maria    |Anne      |Jones   |39192|F     |4000  |{"firstname":"Maria","middlename":"Anne","lastname":"Jones","id":"39192","gender":"F","salary":4000}|
|Jen      |Mary      |Brown   |     |F     |-1    |{"firstname":"Jen","middlename":"Mary","lastname":"Brown","id":"","gender":"F","salary":-1}         |
+---------+----------+--------+-----+------+------+----------------------------------------------------------------------------------------------------+  

现在,有一个UDF,我需要迭代meta列,并将每一行传递给UDF。但是,我只能传递第一行。

代码如下:

def parse_and_post(col):
for i in col.collect():
print(i)
result = json.loads(i)
print(result["firstname"])
#Below is a sample check
if result["firstname"] == "James":
return 200
else:
return -1
#Actual check is as follows
#Format the record in the form of list
#get token
#response = send the record to the API
#return the response

new_df_status = new_df.withColumn("status_of_each_record", lit(parse_and_post(new_df.rdd.map(lambda x: x["meta"]))))  

当我执行这段代码时,我得到如下输出:但是,只有第一个记录的status应该是200,其余的应该是-1:

{"firstname":"James","middlename":"","lastname":"Smith","id":"36636","gender":"M","salary":3000}
James
+---------+----------+--------+-----+------+------+----------------------------------------------------------------------------------------------------+---------------------+
|firstname|middlename|lastname|id   |gender|salary|meta                                                                                                |status_of_each_record|
+---------+----------+--------+-----+------+------+----------------------------------------------------------------------------------------------------+---------------------+
|James    |          |Smith   |36636|M     |3000  |{"firstname":"James","middlename":"","lastname":"Smith","id":"36636","gender":"M","salary":3000}    |200                  |
|Michael  |Rose      |        |40288|M     |4000  |{"firstname":"Michael","middlename":"Rose","lastname":"","id":"40288","gender":"M","salary":4000}   |200                  |
|Robert   |          |Williams|42114|M     |4000  |{"firstname":"Robert","middlename":"","lastname":"Williams","id":"42114","gender":"M","salary":4000}|200                  |
|Maria    |Anne      |Jones   |39192|F     |4000  |{"firstname":"Maria","middlename":"Anne","lastname":"Jones","id":"39192","gender":"F","salary":4000}|200                  |
|Jen      |Mary      |Brown   |     |F     |-1    |{"firstname":"Jen","middlename":"Mary","lastname":"Brown","id":"","gender":"F","salary":-1}         |200                  |
+---------+----------+--------+-----+------+------+----------------------------------------------------------------------------------------------------+---------------------+

如何遍历meta列的每一行。我到底做错了什么?

我认为这里的主要问题是用户定义函数希望每行调用一次,而不是传递整个数据帧。所以对我来说,下面的工作:

new_df = ctx.spark.createDataFrame((
["James", "", "Smith",  36636, "M", 3000, '{"firstname":"James","middlename":"","lastname":"Smith","id":"36636","gender":"M","salary":3000}'],
["Michael", "Rose", "", 40288, "M", 4000, '{"firstname":"Michael","middlename":"Rose","lastname":"","id":"40288","gender":"M","salary":4000}'],
["Robert", "", "Williams", 42114, "M", 4000, '{"firstname":"Robert","middlename":"","lastname":"Williams","id":"42114","gender":"M","salary":4000}'],
["Maria", "Anne", "Jones", 39192,"F", 4000, '{"firstname":"Maria","middlename":"Anne","lastname":"Jones","id":"39192","gender":"F","salary":4000}'],
["Jen", "Mary", "Brown", None, "F", -1, '{"firstname":"Jen","middlename":"Mary","lastname":"Brown","id":"","gender":"F","salary":-1}']
)).toDF("firstname", "middlenmame", "lastname", "id", "gender", "salary", "meta")

@udf()
def parse_and_post(meta):
result = json.loads(meta)
print(result["firstname"])
if result["firstname"] == "James":
return 200
else:
return -1

new_df_status = new_df.withColumn(
"status_of_each_record", parse_and_post(new_df.meta)) 

在您的示例中,您期望将整个数据帧作为parse_and_post的输入,但在这里,我们一次只期望一行。这也简化了我们创建新列的方式。

你需要UDF吗?

您可能要考虑的第二件事是,您是否可以完全不使用UDF ?使用UDF是一个性能杀手,通常您可以不使用它。例如:

from pyspark.sql.types import StructType, StructField
from pyspark.sql import functions as f
# Let spark know what shape of json data to expect. We can ignore
# fields we don't care about with it being a problem
schema = StructType([StructField("firstname", StringType())])
new_df_status = new_df.withColumn(
"status_of_each_record", 
f.when(f.from_json(new_df.meta, schema).firstname == "James", 200)
.otherwise(-1)
)  
new_df_status.show()

即使假设您提供了一个玩具样例,也值得让Spark尽可能多地完成繁重的工作(如json解析),因为这部分工作可以大规模地进行。

对于逐行摄取,请参考@Jon Betts的方法。

在这种情况下,您对API执行批量POST,如果API接受meta数据数组,您可以执行以下操作。这将减少API调用的数量,通常工作效率更高。

你可以先创建一个JSON元列表。

如果id分布良好。

from pyspark.sql import functions as F
num_split = 10 # depends on how big is your data and how much the API can handle.
df = (df.groupBy(F.col('id') % num_split)
.agg(F.collect_list('meta')).alias('meta'))
@F.udf
def _call_bulk_api(meta_list):
# call bulk API (PATCH)
# The returned status is highly dependent on the API.
return 200
df = df.withColumn('status', _call_bulk_api(F.col('meta')))

如果id分布不好,创建增量id。

df = (df.withColumn('sid', F.row_number().over(Window.orderBy('id')))
.groupBy(F.col('sid') % num_split)
.agg(F.collect_list('meta')).alias('meta'))

最新更新