PySpark - 从值列表中添加列



我必须根据值列表向 PySpark 数据帧添加列。

a= spark.createDataFrame([("Dog", "Cat"), ("Cat", "Dog"), ("Mouse", "Cat")],["Animal", "Enemy"])

我有一个叫做评级的列表,它是每只宠物的评级。

rating = [5,4,1]

我需要在数据帧后附加一个名为 Rating 的列,以便

+------+-----+------+
|Animal|Enemy|Rating|
+------+-----+------+
|   Dog|  Cat|     5|
|   Cat|  Dog|     4|
| Mouse|  Cat|     1|
+------+-----+------+

我已经完成了以下操作,但是它只返回评级列列表中的第一个值

def add_labels():
    return rating.pop(0)
labels_udf = udf(add_labels, IntegerType())
new_df = a.withColumn('Rating', labels_udf()).cache()

外:

+------+-----+------+
|Animal|Enemy|Rating|
+------+-----+------+
|   Dog|  Cat|     5|
|   Cat|  Dog|     5|
| Mouse|  Cat|     5|
+------+-----+------+
from pyspark.sql.functions import monotonically_increasing_id, row_number
from pyspark.sql import Window
#sample data
a= sqlContext.createDataFrame([("Dog", "Cat"), ("Cat", "Dog"), ("Mouse", "Cat")],
                               ["Animal", "Enemy"])
a.show()
#convert list to a dataframe
rating = [5,4,1]
b = sqlContext.createDataFrame([(l,) for l in rating], ['Rating'])
#add 'sequential' index and join both dataframe to get the final result
a = a.withColumn("row_idx", row_number().over(Window.orderBy(monotonically_increasing_id())))
b = b.withColumn("row_idx", row_number().over(Window.orderBy(monotonically_increasing_id())))
final_df = a.join(b, a.row_idx == b.row_idx).
             drop("row_idx")
final_df.show()

输入:

+------+-----+
|Animal|Enemy|
+------+-----+
|   Dog|  Cat|
|   Cat|  Dog|
| Mouse|  Cat|
+------+-----+

输出为:

+------+-----+------+
|Animal|Enemy|Rating|
+------+-----+------+
|   Cat|  Dog|     4|
|   Dog|  Cat|     5|
| Mouse|  Cat|     1|
+------+-----+------+

正如@Tw UxTLi51Nus所提到的,如果你可以按动物排序数据帧,而不改变你的结果,你可以执行以下操作:

def add_labels(indx):
    return rating[indx-1] # since row num begins from 1
labels_udf = udf(add_labels, IntegerType())
a = spark.createDataFrame([("Dog", "Cat"), ("Cat", "Dog"), ("Mouse", "Cat")],["Animal", "Enemy"])
a.createOrReplaceTempView('a')
a = spark.sql('select row_number() over (order by "Animal") as num, * from a')
a.show()

+---+------+-----+
|num|Animal|Enemy|
+---+------+-----+
|  1|   Dog|  Cat|
|  2|   Cat|  Dog|
|  3| Mouse|  Cat|
+---+------+-----+
new_df = a.withColumn('Rating', labels_udf('num'))
new_df.show()
+---+------+-----+------+
|num|Animal|Enemy|Rating|
+---+------+-----+------+
|  1|   Dog|  Cat|     5|
|  2|   Cat|  Dog|     4|
|  3| Mouse|  Cat|     1|
+---+------+-----+------+

然后删除num列:

new_df.drop('num').show()
+------+-----+------+
|Animal|Enemy|Rating|
+------+-----+------+
|   Dog|  Cat|     5|
|   Cat|  Dog|     4|
| Mouse|  Cat|     1|
+------+-----+------+

编辑:

另一种 - 但可能丑陋且效率低下 - 如果您无法按列排序,方法是返回rdd并执行以下操作:

a = spark.createDataFrame([("Dog", "Cat"), ("Cat", "Dog"), ("Mouse", "Cat")],["Animal", "Enemy"])
# or create the rdd from the start:
# a = spark.sparkContext.parallelize([("Dog", "Cat"), ("Cat", "Dog"), ("Mouse", "Cat")])
a = a.rdd.zipWithIndex()
a = a.toDF()
a.show()
+-----------+---+
|         _1| _2|
+-----------+---+
|  [Dog,Cat]|  0|
|  [Cat,Dog]|  1|
|[Mouse,Cat]|  2|
+-----------+---+
a = a.select(bb._1.getItem('Animal').alias('Animal'), bb._1.getItem('Enemy').alias('Enemy'), bb._2.alias('num'))
def add_labels(indx):
    return rating[indx] # indx here will start from zero
labels_udf = udf(add_labels, IntegerType())
new_df = a.withColumn('Rating', labels_udf('num'))
new_df.show()
+---------+--------+---+------+
|Animal   |   Enemy|num|Rating|
+---------+--------+---+------+
|      Dog|     Cat|  0|     5|
|      Cat|     Dog|  1|     4|
|    Mouse|     Cat|  2|     1|
+---------+--------+---+------+

(如果你有很多数据,我不建议这样做(

希望这有帮助,祝你好运!

我可能是错的,但我相信接受的答案是行不通的。 monotonically_increasing_id只保证 ID 是唯一且不断增加的,而不是连续的。因此,在两个不同的数据帧上使用它可能会创建两个非常不同的列,并且联接大多返回空。

从这个答案 https://stackoverflow.com/a/48211877/7225303 类似问题中汲取灵感,我们可以将错误答案更改为:

from pyspark.sql.window import Window as W
from pyspark.sql import functions as F
a= sqlContext.createDataFrame([("Dog", "Cat"), ("Cat", "Dog"), ("Mouse", "Cat")],
                               ["Animal", "Enemy"])
a.show()
+------+-----+
|Animal|Enemy|
+------+-----+
|   Dog|  Cat|
|   Cat|  Dog|
| Mouse|  Cat|
+------+-----+

#convert list to a dataframe
rating = [5,4,1]
b = sqlContext.createDataFrame([(l,) for l in rating], ['Rating'])
b.show()
+------+
|Rating|
+------+
|     5|
|     4|
|     1|
+------+

a = a.withColumn("idx", F.monotonically_increasing_id())
b = b.withColumn("idx", F.monotonically_increasing_id())
windowSpec = W.orderBy("idx")
a = a.withColumn("idx", F.row_number().over(windowSpec))
b = b.withColumn("idx", F.row_number().over(windowSpec))
a.show()
+------+-----+---+
|Animal|Enemy|idx|
+------+-----+---+
|   Dog|  Cat|  1|
|   Cat|  Dog|  2|
| Mouse|  Cat|  3|
+------+-----+---+
b.show()
+------+---+
|Rating|idx|
+------+---+
|     5|  1|
|     4|  2|
|     1|  3|
+------+---+
final_df = a.join(b, a.idx == b.idx).drop("idx")
+------+-----+------+
|Animal|Enemy|Rating|
+------+-----+------+
|   Dog|  Cat|     5|
|   Cat|  Dog|     4|
| Mouse|  Cat|     1|
+------+-----+------+
您可以将

评分转换为rdd

rating = [5,4,1]
ratingrdd = sc.parallelize(rating)

然后将您的dataframe转换为rdd,使用zipratingrdd的每个值附加到rdd数据帧,并将压缩的rdd再次转换为dataframe

sqlContext.createDataFrame(a.rdd.zip(ratingrdd).map(lambda x: (x[0][0], x[0][1], x[1])), ["Animal", "Enemy", "Rating"]).show()

它应该给你

+------+-----+------+
|Animal|Enemy|Rating|
+------+-----+------+
|   Dog|  Cat|     5|
|   Cat|  Dog|     4|
| Mouse|  Cat|     1|
+------+-----+------+

尝试执行的操作不起作用,因为rating列表位于驱动程序的内存中,而a数据帧位于执行程序的内存中(udf 也适用于执行程序(。

您需要做的是将键添加到ratings列表中,如下所示:

ratings = [('Dog', 5), ('Cat', 4), ('Mouse', 1)]

然后,从列表中创建一个ratings数据帧,并联接两者以添加新的列:

ratings_df = spark.createDataFrame(ratings, ['Animal', 'Rating'])
new_df = a.join(ratings_df, 'Animal')

我们可以在 Pandas 数据框中添加新列,PySpark 提供了将 Spark 数据框转换为 Pandas 数据框的功能。

test_spark_df = spark.createDataFrame([(1,'A'), (2, 'B'), (3, 'C')], schema=['id', 'name'])
test_spark_df.show()
+---+----+
| id|name|
+---+----+
|  1|   A|
|  2|   B|
|  3|   C|
+---+----+

将此火花 - df 转换为熊猫 df。

new_pandas_df = test_spark_df.toPandas()
new_pandas_df['gender'] = ['M', 'F', 'M']
new_pandas_df
    id  name  gender
0   1   A     M
1   2   B     F
2   3   C     M

将此熊猫 df 转换为火花 df。

converted_spark_df = spark.createDataFrame(new_pandas_df)
converted_spark_df.show()
+---+----+------+
| id|name|gender|
+---+----+------+
|  1|   A|     M|
|  2|   B|     F|
|  3|   C|     M|
+---+----+------+

按照使用 udf 的最初想法,您可以执行以下操作:

import pyspark.sql.functions as F
def add_labels(idx):
    lista = [5,4,1]
    return lista[idx]
a = spark.createDataFrame([("Dog", "Cat"), ("Cat", "Dog"), ("Mouse", "Cat")],["Animal", "Enemy"])
a = a.withColumn("idx", F.monotonically_increasing_id())
a.show()
+------+-----+---+
|Animal|Enemy|idx|
+------+-----+---+
|   Dog|  Cat|  0|
|   Cat|  Dog|  1|
| Mouse|  Cat|  2|
+------+-----+---+
labels_udf = F.udf(add_labels, IntegerType())
new_df = a.withColumn('Rating', labels_udf(F.col('idx'))).drop('idx')
new_df.show()
+------+-----+------+
|Animal|Enemy|Rating|
+------+-----+------+
|   Dog|  Cat|     5|
|   Cat|  Dog|     4|
| Mouse|  Cat|     1|
+------+-----+------+

相关内容

  • 没有找到相关文章

最新更新