下面是创建pyspark的代码。sql DataFrame
import numpy as np
import pandas as pd
from pyspark import SparkContext
from pyspark.sql import SQLContext
df = pd.DataFrame(np.array([[1,2,3],[4,5,6],[7,8,9],[10,11,12]]), columns=['a','b','c'])
sparkdf = sqlContext.createDataFrame(df, samplingRatio=0.1)
所以sparkdf看起来像
a b c
1 2 3
4 5 6
7 8 9
10 11 12
现在我想添加一个新的列numpy数组(甚至一个列表)
new_col = np.array([20,20,20,20])
但是标准方式
sparkdf = sparkdf.withColumn('newcol', new_col)
失败。可能udf是要走的路,但我不知道如何创建一个udf,每个DataFrame行分配一个不同的值,即通过new_col迭代。我看过其他的pyspark和pyspark。SQL,但找不到解决方案。我也需要留在pyspark。SQL不是scala的解决方案。谢谢!
假设数据帧按照数组中值的顺序排序,您可以压缩rdd并重建数据帧,如下所示:
n = sparkdf.rdd.getNumPartitions()
# Parallelize and cast to plain integer (np.int64 won't work)
new_col = sc.parallelize(np.array([20,20,20,20]), n).map(int)
def process(pair):
return dict(pair[0].asDict().items() + [("new_col", pair[1])])
rdd = (sparkdf
.rdd # Extract RDD
.zip(new_col) # Zip with new col
.map(process)) # Add new column
sqlContext.createDataFrame(rdd) # Rebuild data frame
也可以使用join:
new_col = sqlContext.createDataFrame(
zip(range(1, 5), [20] * 4),
("rn", "new_col"))
sparkdf.registerTempTable("df")
sparkdf_indexed = sqlContext.sql(
# Make sure we have specific order and add row number
"SELECT row_number() OVER (ORDER BY a, b, c) AS rn, * FROM df")
(sparkdf_indexed
.join(new_col, new_col.rn == sparkdf_indexed.rn)
.drop(new_col.rn))
但是窗口函数组件是不可伸缩的,应该避免使用较大的数据集。
当然,如果你需要的只是一个单一值的列,你可以简单地使用lit
import pyspark.sql.functions as f
sparkdf.withColumn("new_col", f.lit(20))