正在更新spark中的数据帧列



查看新的spark DataFrame API,不清楚是否可以修改数据帧列。

如何更改数据帧的行xy中的值?

pandas中,这将是:

df.ix[x,y] = new_value

编辑:合并下面所说的内容,您不能修改现有的数据帧,因为它是不可变的,但您可以返回具有所需修改的新数据帧。

如果您只想根据条件替换列中的值,如np.where:

from pyspark.sql import functions as F
update_func = (F.when(F.col('update_col') == replace_val, new_value)
                .otherwise(F.col('update_col')))
df = df.withColumn('new_column_name', update_func)

如果您想对列执行一些操作,并创建一个添加到数据帧的新列:

import pyspark.sql.functions as F
import pyspark.sql.types as T
def my_func(col):
    do stuff to column here
    return transformed_value
# if we assume that my_func returns a string
my_udf = F.UserDefinedFunction(my_func, T.StringType())
df = df.withColumn('new_column_name', my_udf('update_col'))

如果您希望新列与旧列具有相同的名称,您可以添加额外的步骤:

df = df.drop('update_col').withColumnRenamed('new_column_name', 'update_col')

虽然不能这样修改列,但可以对列进行操作并返回反映该更改的新DataFrame。为此,您应该首先创建一个实现要应用的操作的UserDefinedFunction,然后选择性地将该函数仅应用于目标列。在Python中:

from pyspark.sql.functions import UserDefinedFunction
from pyspark.sql.types import StringType
name = 'target_column'
udf = UserDefinedFunction(lambda x: 'new_value', StringType())
new_df = old_df.select(*[udf(column).alias(name) if column == name else column for column in old_df.columns])

new_df现在具有与old_df相同的模式(假设old_df.target_column也是StringType类型),但列target_column中的所有值都将是new_value

通常在更新列时,我们希望将旧值映射到新值。以下是一种在pyspark中不使用UDF的方法:

# update df[update_col], mapping old_value --> new_value
from pyspark.sql import functions as F
df = df.withColumn(update_col,
    F.when(df[update_col]==old_value,new_value).
    otherwise(df[update_col])).

DataFrames基于RDD。RDD是不可变的结构,不允许在现场更新元素。要更改值,您需要通过使用类似SQL的DSL或类似map的RDD操作来转换原始DataFrame,从而创建一个新的DataFrame。

强烈推荐的幻灯片:在Spark中为大规模数据科学介绍DataFrames。

正如maasg所说,您可以根据应用于旧DataFrame的映射结果创建新的DataFrame。具有两行的给定DataFrame df的示例:

val newDf = sqlContext.createDataFrame(df.map(row => 
  Row(row.getInt(0) + SOMETHING, applySomeDef(row.getAs[Double]("y")), df.schema)

请注意,如果列的类型发生了更改,则需要为其提供正确的模式,而不是df.schema。查看org.apache.spark.sql.Row的api以了解可用的方法:https://spark.apache.org/docs/latest/api/java/org/apache/spark/sql/Row.html

〔更新〕或者在Scala:中使用UDF

import org.apache.spark.sql.functions._
val toLong = udf[Long, String] (_.toLong)
val modifiedDf = df.withColumn("modifiedColumnName", toLong(df("columnName"))).drop("columnName")

如果列名需要保持不变,您可以将其重命名为:

modifiedDf.withColumnRenamed("modifiedColumnName", "columnName")

pyspark.sql.functions导入col,并根据字符串(字符串a、字符串b、字符串c)将第五列更新为整数(0,1,2)到新的DataFrame中。

from pyspark.sql.functions import col, when 
data_frame_temp = data_frame.withColumn("col_5",when(col("col_5") == "string a", 0).when(col("col_5") == "string b", 1).otherwise(2))

相关内容

  • 没有找到相关文章

最新更新