我正在尝试从数据中获取最大产品价格。
from pyspark.sql import functions as func
from pyspark.sql import Row
rdd=sc.textFile("/mnt/my_s3_data/retail_db/products/").map(lambda x: x.split(','))
row_rdd=rdd.map(lambda o : Row(product_id=o[0],product_category_id=o[1],product_name=o[2],product_description=o[3],product_price=o[4],product_image=o[5]))
df=row_rdd.toDF()
df.select(func.max('product_price')).show()
结果: 999.99
预期结果:1999.99
df.where(func.col('product_id') == 208).show()
输出: 我要获得的产品价格1999.99
注意:**如果我将数据框架限制为255记录,我将获得预期的输出:
df.limit(255).agg({"product_price": "max"}).show()
,因为product_id为字符串键入最大函数将进行字符串比较以查找最大值而不是数字比较,因此您获得了错误的结果,以避免这种情况,以避免这种情况。将类型投射到数字
以下是示例代码
val updatedDF= df.withColumn("product_id_num",expr("cast (product_id as
double)")).withColumnRenamed("product_id_num","product_id")
现在找到更新的最大值,您将获得准确的结果。
它正在工作
newdf = df.filter(" product_price!=''"( newdf.Select(func.max(newdf.product_price.cast(" double"((。别名('maxtair_price'((。show((