Pyspark-在特定列上运行的lambda表达式



我有一个看起来像:

的pyspark dataframe
+---------------+---+---+---+---+---+---+
|         Entity| id|  7| 15| 19| 21| 27|
+---------------+---+---+---+---+---+---+
|              a|  0|  0|  1|  0|  0|  0|
|              b|  1|  0|  0|  0|  1|  0|
|              c|  2|  0|  0|  0|  1|  0|
|              d|  3|  2|  0|  0|  0|  0|
|              e|  4|  0|  3|  0|  0|  0|
|              f|  5|  0| 25|  0|  0|  0|
|              g|  6|  2|  0|  0|  0|  0|

我想在每个列sans ensitity& amp; amp; amp; amp; amp; amp; amp; amp; amp; amp; amp; amp; amp; amp; amp; amp; amp; amp; amp; amp; amp; amp; amp; amp; amp; amp; amp; amp; amp; amp; amp; amp; amp; amp; amp; amp; amp;ID。实体之后可能有许多列。ID(在这种情况下,有5个,但可能有100或1000或更多)。

这是我到目前为止所拥有的:

 random_df = data.select("*").rdd.map(
     lambda x, r=random: [Row(str(row)) if isinstance(row, unicode) else 
     Row(float(r.random() + row)) for row in x]).toDF(data.columns)

但是,这还将为ID列增加一个随机值。通常,如果我以前知道元素的数量,并且我知道它们会被修复

data.select("*").rdd.map(lambda (a,b,c,d,e,f,g): 
         Row(a,b, r.random() + c r.random() + d, r.random() + e, r.random() 
               + f, r.random() + g))

但是,不幸的是,由于不知道我会提前有多少列。想法吗?我真的很感谢您的帮助!

我非常感谢!

编辑:我还应该注意," ID"是通话的结果:

data = data.withColumn("id", monotonically_increasing_id())

当我尝试将列" ID"转换为字符串类型时,添加此编辑,以使我的" IsInstance(row,unicode)"将触发,但我没有成功。以下代码:

data = data.withColumn("id", data['id'].cast(StringType)

导致:

raise TypeError("unexpected type: %s" % type(dataType))
TypeError: unexpected type: <class 'pyspark.sql.types.DataTypeSingleton'>

您应该在id列上尝试.cast("string")

import random
import pyspark.sql.functions as f
from pyspark.sql.types import Row
df = sc.parallelize([
    ['a', 0, 1, 0, 0, 0],
    ['b', 0, 0, 0, 1, 0],
    ['c', 0, 0, 0, 1, 0],
    ['d', 2, 0, 0, 0, 0],
    ['e', 0, 3, 0, 0, 0],
    ['f', 0, 25,0, 0, 0],
    ['g', 2, 0, 0, 0, 0],
]).toDF(('entity', '7', '15', '19', '21', '27'))
df = df.withColumn("id", f.monotonically_increasing_id())
df = df.withColumn("id_string", df["id"].cast("string")).drop("id")
df.show()
random_df = df.select("*").rdd.map(
     lambda x, r=random: [Row(str(row)) if isinstance(row, unicode) else
     Row(float(r.random() + row)) for row in x]).toDF(df.columns)
random_df.show()

相关内容

  • 没有找到相关文章

最新更新