Spark数据帧内循环每次都较慢



我是Spark的新手,似乎我正在寻找正确的方法来多次迭代数据帧。

我试图在一个火花数据帧上循环10次,每次为不同的日期获得匹配的结果,但随着时间的推移,这个过程需要越来越长。我尝试使用unpersist(),但是没有帮助。

希望有人能帮助我。

import findspark
findspark.init()
import pyspark.sql.functions as F
from pyspark.sql import SparkSession
from itertools import combinations
import datetime
spark = SparkSession.builder.appName("Practice").master("local[*]").config("spark.executor.memory", "70g").config("spark.driver.memory", "50g").config("spark.memory.offHeap.enabled",True).config("spark.memory.offHeap.size","16g").getOrCreate()
df = spark.read.parquet('spark-big-dataparquet_small_example.parquet')
res =[]
for date in range(10):
df = df.withColumn('fs_origin',df.request.Segments.getItem(0)['Origin'])
df = df.withColumn('fs_destination',df.request.Segments.getItem(0)['Destination'])
df = df.withColumn('fs_date',df.request.Segments.getItem(0)['FlightTime'])
df = df.withColumn('ss_origin',df.request.Segments.getItem(1)['Origin'])
df = df.withColumn('ss_destination',df.request.Segments.getItem(1)['Destination'])
df = df.withColumn('ss_date',df.request.Segments.getItem(1)['FlightTime'])
df = df.withColumn('full_date',F.concat_ws('-', df.year,df.month,df.day))
df = df.filter( (df["fs_origin"] == 'TLV') & (df["fs_destination"] == 'NYC') & (df["ss_origin"] == 'NYC') & (df['ss_destination']=='TLV') & (df['fs_date']=='2021-02-'+str(date)+'T00:00:00') & (df['ss_date']=='2021-02-16'+'T00:00:00'))
if df.count()==0:
res.append(0)

else:
df = df.sort(F.unix_timestamp("full_date", "yyyy-M-d").desc())
latest_day = df.collect()[0]['full_date']

df = df.filter(df['full_date']==latest_day)
df = df.withColumn("exploded_data", F.explode("response.results"))
df = df.withColumn(
"price",
F.col("exploded_data").getItem('PriceInfo').getItem('Price') # either get by name or by index e.g. getItem(0) etc
)
res.append(df.sort(df.price.asc()).collect()[0]['price'])

df.unpersist()
spark.catalog.clearCache()

你可以试试这个吗:-

import findspark
findspark.init()
import pyspark.sql.functions as F
from pyspark.sql import SparkSession
from itertools import combinations
import datetime
spark = SparkSession.builder.appName("Practice").master("local[*]").config("spark.executor.memory", "70g").config("spark.driver.memory", "50g").config("spark.memory.offHeap.enabled",True).config("spark.memory.offHeap.size","16g").getOrCreate()
df = spark.read.parquet('spark-big-dataparquet_small_example.parquet')
df = df.withColumn('fs_origin',df.request.Segments.getItem(0)['Origin'])
df = df.withColumn('fs_destination',df.request.Segments.getItem(0)['Destination'])
df = df.withColumn('fs_date',df.request.Segments.getItem(0)['FlightTime'])
df = df.withColumn('ss_origin',df.request.Segments.getItem(1)['Origin'])
df = df.withColumn('ss_destination',df.request.Segments.getItem(1)['Destination'])
df = df.withColumn('ss_date',df.request.Segments.getItem(1)['FlightTime'])
df = df.withColumn('full_date',F.concat_ws('-', df.year,df.month,df.day))
df = df.filter((df["fs_origin"] == 'TLV') & (df["fs_destination"] == 'NYC') & (df["ss_origin"] == 'NYC') & (df['ss_destination']=='TLV')).persist()
df.count()
res =[]
for date in range(10):
df_date = df.filter((df['fs_date']=='2021-02-'+str(date)+'T00:00:00') & (df['ss_date']=='2021-02-16'+'T00:00:00'))
if df_date.count()==0:
res.append(0)

else:
df_date = df_date.sort(F.unix_timestamp("full_date", "yyyy-M-d").desc())
latest_day = df_date.collect()[0]['full_date']
df_date = df_date.filter(df_date['full_date']==latest_day)
df_date = df_date.withColumn("exploded_data", F.explode("response.results"))
df_date = df_date.withColumn(
"price",
F.col("exploded_data").getItem('PriceInfo').getItem('Price') # either get by name or by index e.g. getItem(0) etc
)
res.append(df_date.sort(df_date.price.asc()).collect()[0]['price'])

df.unpersist()
spark.catalog.clearCache()