我正在准备一个数据集,以开发一个有监督的模型,在给定之前5个值的情况下预测一个值。例如,给定下面的样本数据,我将预测给定列1:5的第6列,或给定列3:7的第8列。
id 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 ...
a 150 110 130 80 136 150 190 110 150 110 130 136 100 150 190 110
b 100 100 130 100 136 100 160 230 122 130 15 200 100 100 136 100
c 130 122 140 140 122 130 15 200 100 100 130 100 136 100 160 230
为此,我想将上面的示例数据重新组织为6列的行,每个切片/窗口都有6个可能的值(例如1:6、2:7、3:8(。我该怎么做?在PySpark/SQL中可能吗?以下输出示例,索引仅供澄清:
1 2 3 4 5 6
a[1:6] 150 110 130 80 136 150
a[2:7] 110 130 80 136 150 190
a[3:8] 130 80 136 150 190 110
...
c[1:6] 130 122 140 140 122 130
c[2:7] 122 140 140 122 130 15
...
c[10:16] 130 100 136 100 160 230
您可以将列转换为数组或结构数组 使用阵列阵列: 使用结构数组(spark 2.4+(: 上述f字符串中的代码与N=6时的代码相同:from pyspark.sql.functions import struct, explode, array, col
# all columns except the first
cols = df.columns[1:]
# size of the splits
N = 6
df_new = df.withColumn('dta', explode(array(*[ array(*cols[i:i+N]) for i in range(len(cols)-N+1) ])))
.select('id', *[ col('dta')[i].alias(str(i+1)) for i in range(N) ])
df_new.show()
+---+---+---+---+---+---+---+
| id| 1| 2| 3| 4| 5| 6|
+---+---+---+---+---+---+---+
| a|150|110|130| 80|136|150|
| a|110|130| 80|136|150|190|
| a|130| 80|136|150|190|110|
| a| 80|136|150|190|110|150|
| a|136|150|190|110|150|110|
| a|150|190|110|150|110|130|
| a|190|110|150|110|130|136|
| a|110|150|110|130|136|100|
| a|150|110|130|136|100|150|
| a|110|130|136|100|150|190|
| a|130|136|100|150|190|110|
| b|100|100|130|100|136|100|
+---+---+---+---+---+---+---+
df_new = df.withColumn('dta', array(*cols))
.selectExpr("id", f"""
inline(transform(sequence(0,{len(cols)-N}), i -> ({','.join(f'dta[i+{j}] as `{j+1}`' for j in range(N))})))
""")
inline(transform(sequence(0,10), i -> struct(dta[i] as `1`, dta[i+1] as `2`, dta[i+2] as `3`, dta[i+3] as `4`, dta[i+4] as `5`, dta[i+5] as `6`)))
是的,您可以使用此代码(并对其进行修改以获得所需内容(:
partitions = []
for row in df.rdd.toLocalIterator():
row_list = list(row)
num_elements = 6
for i in range(0, len(row_list) - num_elements):
partition = row[i : i+num_elements]
partitions.append(partition)
output_df = spark.createDataFrame(partitions)