我有一个看起来像:
的pyspark dataframe+---------------+---+---+---+---+---+---+
| Entity| id| 7| 15| 19| 21| 27|
+---------------+---+---+---+---+---+---+
| a| 0| 0| 1| 0| 0| 0|
| b| 1| 0| 0| 0| 1| 0|
| c| 2| 0| 0| 0| 1| 0|
| d| 3| 2| 0| 0| 0| 0|
| e| 4| 0| 3| 0| 0| 0|
| f| 5| 0| 25| 0| 0| 0|
| g| 6| 2| 0| 0| 0| 0|
我想在每个列sans ensitity& amp; amp; amp; amp; amp; amp; amp; amp; amp; amp; amp; amp; amp; amp; amp; amp; amp; amp; amp; amp; amp; amp; amp; amp; amp; amp; amp; amp; amp; amp; amp; amp; amp; amp; amp; amp; amp;ID。实体之后可能有许多列。ID(在这种情况下,有5个,但可能有100或1000或更多)。
这是我到目前为止所拥有的:
random_df = data.select("*").rdd.map(
lambda x, r=random: [Row(str(row)) if isinstance(row, unicode) else
Row(float(r.random() + row)) for row in x]).toDF(data.columns)
但是,这还将为ID列增加一个随机值。通常,如果我以前知道元素的数量,并且我知道它们会被修复
data.select("*").rdd.map(lambda (a,b,c,d,e,f,g):
Row(a,b, r.random() + c r.random() + d, r.random() + e, r.random()
+ f, r.random() + g))
但是,不幸的是,由于不知道我会提前有多少列。想法吗?我真的很感谢您的帮助!
我非常感谢!编辑:我还应该注意," ID"是通话的结果:
data = data.withColumn("id", monotonically_increasing_id())
当我尝试将列" ID"转换为字符串类型时,添加此编辑,以使我的" IsInstance(row,unicode)"将触发,但我没有成功。以下代码:
data = data.withColumn("id", data['id'].cast(StringType)
导致:
raise TypeError("unexpected type: %s" % type(dataType))
TypeError: unexpected type: <class 'pyspark.sql.types.DataTypeSingleton'>
您应该在id
列上尝试.cast("string")
。
import random
import pyspark.sql.functions as f
from pyspark.sql.types import Row
df = sc.parallelize([
['a', 0, 1, 0, 0, 0],
['b', 0, 0, 0, 1, 0],
['c', 0, 0, 0, 1, 0],
['d', 2, 0, 0, 0, 0],
['e', 0, 3, 0, 0, 0],
['f', 0, 25,0, 0, 0],
['g', 2, 0, 0, 0, 0],
]).toDF(('entity', '7', '15', '19', '21', '27'))
df = df.withColumn("id", f.monotonically_increasing_id())
df = df.withColumn("id_string", df["id"].cast("string")).drop("id")
df.show()
random_df = df.select("*").rdd.map(
lambda x, r=random: [Row(str(row)) if isinstance(row, unicode) else
Row(float(r.random() + row)) for row in x]).toDF(df.columns)
random_df.show()