在Pyspark中将数据窗口化为行



我正在准备一个数据集,以开发一个有监督的模型,在给定之前5个值的情况下预测一个值。例如,给定下面的样本数据,我将预测给定列1:5的第6列,或给定列3:7的第8列。

id    1   2   3   4   5   6   7   8   9   10  11  12  13  14  15  16 ...
a    150 110 130  80 136 150 190 110 150 110 130 136 100 150 190 110
b    100 100 130 100 136 100 160 230 122 130  15 200 100 100 136 100
c    130 122 140 140 122 130  15 200 100 100 130 100 136 100 160 230

为此,我想将上面的示例数据重新组织为6列的行,每个切片/窗口都有6个可能的值(例如1:6、2:7、3:8(。我该怎么做?在PySpark/SQL中可能吗?以下输出示例,索引仅供澄清:

1   2   3   4   5   6   
a[1:6]    150 110 130  80 136 150 
a[2:7]    110 130  80 136 150 190
a[3:8]    130  80 136 150 190 110
...
c[1:6]    130 122 140 140 122 130
c[2:7]    122 140 140 122 130  15
...
c[10:16]  130 100 136 100 160 230

您可以将列转换为数组结构数组

from pyspark.sql.functions import struct, explode, array, col
# all columns except the first
cols = df.columns[1:]
# size of the splits
N = 6

使用阵列阵列:

df_new = df.withColumn('dta', explode(array(*[ array(*cols[i:i+N]) for i in range(len(cols)-N+1) ]))) 
.select('id', *[ col('dta')[i].alias(str(i+1)) for i in range(N) ])
df_new.show()
+---+---+---+---+---+---+---+
| id|  1|  2|  3|  4|  5|  6|
+---+---+---+---+---+---+---+
|  a|150|110|130| 80|136|150|
|  a|110|130| 80|136|150|190|
|  a|130| 80|136|150|190|110|
|  a| 80|136|150|190|110|150|
|  a|136|150|190|110|150|110|
|  a|150|190|110|150|110|130|
|  a|190|110|150|110|130|136|
|  a|110|150|110|130|136|100|
|  a|150|110|130|136|100|150|
|  a|110|130|136|100|150|190|
|  a|130|136|100|150|190|110|
|  b|100|100|130|100|136|100|
+---+---+---+---+---+---+---+

使用结构数组(spark 2.4+(:

df_new = df.withColumn('dta', array(*cols)) 
.selectExpr("id", f"""
inline(transform(sequence(0,{len(cols)-N}), i -> ({','.join(f'dta[i+{j}] as `{j+1}`' for j in range(N))})))
""")

上述f字符串中的代码与N=6时的代码相同:

inline(transform(sequence(0,10), i -> struct(dta[i] as `1`, dta[i+1] as `2`, dta[i+2] as `3`, dta[i+3] as `4`, dta[i+4] as `5`, dta[i+5] as `6`)))

是的,您可以使用此代码(并对其进行修改以获得所需内容(:

partitions = []
for row in df.rdd.toLocalIterator():
row_list = list(row)
num_elements = 6
for i in range(0, len(row_list) - num_elements):
partition = row[i : i+num_elements]
partitions.append(partition)
output_df = spark.createDataFrame(partitions)

相关内容

  • 没有找到相关文章

最新更新