我是火花的新手,我想转换下面的源数据帧(从JSON文件加载):
+--+-----+-----+
|A |count|major|
+--+-----+-----+
| a| 1| m1|
| a| 1| m2|
| a| 2| m3|
| a| 3| m4|
| b| 4| m1|
| b| 1| m2|
| b| 2| m3|
| c| 3| m1|
| c| 4| m3|
| c| 5| m4|
| d| 6| m1|
| d| 1| m2|
| d| 2| m3|
| d| 3| m4|
| d| 4| m5|
| e| 4| m1|
| e| 5| m2|
| e| 1| m3|
| e| 1| m4|
| e| 1| m5|
+--+-----+-----+
进入结果数据帧下方:
+--+--+--+--+--+--+
|A |m1|m2|m3|m4|m5|
+--+--+--+--+--+--+
| a| 1| 1| 2| 3| 0|
| b| 4| 2| 1| 0| 0|
| c| 3| 0| 4| 5| 0|
| d| 6| 1| 2| 3| 4|
| e| 4| 5| 1| 1| 1|
+--+--+--+--+--+--+
这是转换规则:
结果数据帧由
A + (n major columns)
组成,其中major
列的名称由指定sorted(src_df.map(lambda x: x[2]).distinct().collect())
结果数据帧包含
m
行,其中A
列的值由提供sorted(src_df.map(lambda x: x[0]).distinct().collect())
结果数据帧中每个主列的值是来自相应
A
和主上的源数据帧的值(例如,源数据帧中第1行中的计数被映射到box
,其中A
是a
,列m1
)源数据帧中
A
和major
的组合没有重复(请将其视为SQL中两列上的主键)
使用zero323的数据帧
df = sqlContext.createDataFrame([
("a", 1, "m1"), ("a", 1, "m2"), ("a", 2, "m3"),
("a", 3, "m4"), ("b", 4, "m1"), ("b", 1, "m2"),
("b", 2, "m3"), ("c", 3, "m1"), ("c", 4, "m3"),
("c", 5, "m4"), ("d", 6, "m1"), ("d", 1, "m2"),
("d", 2, "m3"), ("d", 3, "m4"), ("d", 4, "m5"),
("e", 4, "m1"), ("e", 5, "m2"), ("e", 1, "m3"),
("e", 1, "m4"), ("e", 1, "m5")],
("a", "cnt", "major"))
你也可以使用
reshaped_df = df.groupby('a').pivot('major').max('cnt').fillna(0)
让我们从示例数据开始:
df = sqlContext.createDataFrame([
("a", 1, "m1"), ("a", 1, "m2"), ("a", 2, "m3"),
("a", 3, "m4"), ("b", 4, "m1"), ("b", 1, "m2"),
("b", 2, "m3"), ("c", 3, "m1"), ("c", 4, "m3"),
("c", 5, "m4"), ("d", 6, "m1"), ("d", 1, "m2"),
("d", 2, "m3"), ("d", 3, "m4"), ("d", 4, "m5"),
("e", 4, "m1"), ("e", 5, "m2"), ("e", 1, "m3"),
("e", 1, "m4"), ("e", 1, "m5")],
("a", "cnt", "major"))
请注意,我已将count
更改为cnt
。在大多数SQL方言中,Count是一个保留关键字,它不是列名的好选择。
至少有两种方法可以重塑这些数据:
通过DataFrame 进行聚合
from pyspark.sql.functions import col, when, max majors = sorted(df.select("major") .distinct() .map(lambda row: row[0]) .collect()) cols = [when(col("major") == m, col("cnt")).otherwise(None).alias(m) for m in majors] maxs = [max(col(m)).alias(m) for m in majors] reshaped1 = (df .select(col("a"), *cols) .groupBy("a") .agg(*maxs) .na.fill(0)) reshaped1.show() ## +---+---+---+---+---+---+ ## | a| m1| m2| m3| m4| m5| ## +---+---+---+---+---+---+ ## | a| 1| 1| 2| 3| 0| ## | b| 4| 1| 2| 0| 0| ## | c| 3| 0| 4| 5| 0| ## | d| 6| 1| 2| 3| 4| ## | e| 4| 5| 1| 1| 1| ## +---+---+---+---+---+---+
RDD 上的
groupBy
from pyspark.sql import Row grouped = (df .map(lambda row: (row.a, (row.major, row.cnt))) .groupByKey()) def make_row(kv): k, vs = kv tmp = dict(list(vs) + [("a", k)]) return Row(**{k: tmp.get(k, 0) for k in ["a"] + majors}) reshaped2 = sqlContext.createDataFrame(grouped.map(make_row)) reshaped2.show() ## +---+---+---+---+---+---+ ## | a| m1| m2| m3| m4| m5| ## +---+---+---+---+---+---+ ## | a| 1| 1| 2| 3| 0| ## | e| 4| 5| 1| 1| 1| ## | c| 3| 0| 4| 5| 0| ## | b| 4| 1| 2| 0| 0| ## | d| 6| 1| 2| 3| 4| ## +---+---+---+---+---+---+
这是您的原始数据帧:
df.show()
+--+-----+-----+ |A |count|major| +--+-----+-----+ | a| 1| m1| | a| 1| m2| | a| 2| m3| | a| 3| m4| | b| 4| m1| | b| 1| m2| | b| 2| m3| | c| 3| m1| | c| 4| m3| | c| 5| m4| | d| 6| m1| | d| 1| m2| | d| 2| m3| | d| 3| m4| | d| 4| m5| | e| 4| m1| | e| 5| m2| | e| 1| m3| | e| 1| m4| | e| 1| m5| +--+-----+-----+
使用pivot来重塑";"主要";,按";A";,以及";计数";合计为值:
data = ( df.groupBy("A")
.pivot("major")
.sum("count") )
display(data)
+--+--+--+--+--+--+ |A |m1|m2|m3|m4|m5| +--+--+--+--+--+--+ | a| 1| 1| 2| 3| 0| | b| 4| 2| 1| 0| 0| | c| 3| 0| 4| 5| 0| | d| 6| 1| 2| 3| 4| | e| 4| 5| 1| 1| 1| +--+--+--+--+--+--+
@TrentWoodbury的答案很好,我投了赞成票。但是,如果聚合值不是一个数字,它就不起作用,因为.max('cnt')
无法执行所需的聚合。此外,如果您知道不会有重复的值,那么max可能会更慢,或者只是不需要。
以下适用于所有数据类型:
from pyspark.sql.functions import first
reshaped_df = df.groupby('a')
.pivot('major')
.agg(first('cnt'))