我正在尝试提取Spark DataFrame中列"ID"的最大值,并在执行插入时递增。
我能够打印最大值,但无法将其存储在变量中
max_id = df.agg({"ID":"max"}).collect()[0]
print max_id["max(ID)"]
id2 = max_id["max{"ID"}]
引发错误
语法错误:语法无效
如何将其提取到变量中并递增?
请考虑以下数据帧:
l1 = [(1,2),(3,4),(5,6), (12,537)]
df1 = spark.createDataFrame(l1,['ID','col2'])
max_id=df1.agg({"ID":"max"}).collect()[0]
变量max_id是一个 pyspark.sql.types.Row。您可以使用 type(( 进行检查:
type(max_id)
输出:
<class 'pyspark.sql.types.Row'>
pyspark.sql.types.Row 的元素可以像带有方括号的字典一样访问:
max_id['max(ID)']
所以你想要改变的只是你的max_id作业:
max_id=df1.agg({"ID":"max"}).collect()[0]['max(ID)']
type(max_id)
输出:
<class 'int'>
max_id现在是一个可以递增的整数:
max_id = max_id+1
上面的代码的问题是,你使用了大括号。
我想出了如何将列的最大值提取为 int。
maxid=int(df.describe("ID"(.filter("summary = 'max'"(.select("ID"(.collect(([0].asDict((['ID']
打印最大---结果 :3
id2=maxid+1打印 ID2---结果4
df = spark.range(5)
如果每列只需要一个聚合,则可以.agg({'id':'max'})
:
max_id = df.agg({'id':'max'}).collect()[0][0]
print(max_id)
# 4
如果每列需要更多聚合,则.agg({'id':'max', 'id':'min'}
不起作用(仅返回最后一个聚合(。
您需要.select
:
from pyspark.sql import functions as F
max_min = df.select(F.max('id'), F.min('id')).collect()[0]
max_id, min_id = max_min[0], max_min[1]
print(max_id, min_id)
# 4 0