我想使用pyspark进行一些数据预处理,并希望删除数据帧中数据开头和结尾的数据。假设我希望删除前30%和后30%的数据。我只根据使用where
的值找到可能性,并找到first
和last
,但没有找到几个。以下是迄今为止没有解决方案的基本示例:
import pandas as pd
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("foo").getOrCreate()
cut_factor_start = 0.3 # factor to cut the beginning of the data
cut_factor_stop = 1-cut_factor_start # factor to cut the end of the data
# create pandas dataframe
df = pd.DataFrame({'part':['foo','foo','foo','foo','foo', 'foo'], 'values':[9,1,2,2,6,9]})
# convert to spark dataframe
df = spark.createDataFrame(df)
df.show()
D_4+----+------+ |part|values| +----+------+ | foo| 9| | foo| 1| | foo| 2| | foo| 2| | foo| 6| | foo| 9| +----+------+
我想要的是基于计算的:
+----+------+ |part|values| +----+------+ | foo| 1| | foo| 2| | foo| 2| +----+------+
另一种方法是在分配行编号后使用between
:
import pyspark.sql.functions as F
from pyspark.sql import Window
rnum= F.row_number().over(Window.orderBy(F.lit(0)))
output = (df.withColumn('Rnum',rnum)
.filter(F.col("Rnum").between(cut_start, cut_stop)).drop('Rnum'))
output.show()
+----+------+
|part|values|
+----+------+
| foo| 1|
| foo| 2|
| foo| 2|
+----+------+
在Scala上,可以添加唯一的"id"列,然后添加"limit"one_answers"except"函数:
val dfWithIds = df.withColumn("uniqueId", monotonically_increasing_id())
dfWithIds
.limit(stopPostionToCut)
.except(dfWithIds.limit(startPostionToCut - 1))
.drop("uniqueId")