如何避免在spark (python)中使用for循环



我是pySpark的新手,希望有人能帮助我。

我有一个数据帧与一堆航班搜索结果:

+------+-----------+----------+----------+-----+
|origin|destination|      from|        to|price|
+------+-----------+----------+----------+-----+
|   TLV|        NYC|2022-01-01|2022-01-05| 1000|
|   TLV|        ROM|2022-03-01|2022-04-05|  480|
|   TLV|        NYC|2022-01-02|2022-01-04|  990|
|   TLV|        NYC|2022-02-01|2022-03-15| 1200|
|   TLV|        NYC|2022-01-02|2022-01-05| 1100|
|   TLV|        BLR|2022-01-01|2022-01-05| 1480|
|   TLV|        NYC|2022-01-02|2022-01-05| 1010|
+------+-----------+----------+----------+-----+

我想从数据框中获得基于出发地和目的地日期的所有航班价格。

我有一个列表,其中包含一些日期组合,如下所示:

date_combinations = [("2022-01-01", "2022-01-02"), ("2022-01-01", "2022-01-03"), ("2022-01-01", "2022-01-04"),("2022-01-01", "2022-01-05"), ("2022-01-02", "2022-01-03"), ("2022-01-02", "2022-01-04"),("2022-01-02", "2022-01-05"), ("2022-01-03", "2022-01-04"), ("2022-01-03", "2022-01-05"), ("2022-01-04", "2022-01-05") ]

我目前正在做的是过滤每个日期组合的for循环中的数据框架:

for date in date_combinations:
df_date = df.filter((df['from']==date[0])&(df['to']==date[1]))
if df_date.count()==0:
results.append([date, 0])
else:
results.append([date, df_date.collect()[0]['price']])

输出:

[('2022-01-01', '2022-01-02'), 0]
[('2022-01-01', '2022-01-03'), 0]
[('2022-01-01', '2022-01-04'), 0]
[('2022-01-01', '2022-01-05'), 1000]
[('2022-01-02', '2022-01-03'), 0]
[('2022-01-02', '2022-01-04'), 990]
[('2022-01-02', '2022-01-05'), 1100]
[('2022-01-03', '2022-01-04'), 0]
[('2022-01-03', '2022-01-05'), 0]
[('2022-01-04', '2022-01-05'), 0]

输出是OK的,但是我确信有一种更有效的方法来做它,而不是for循环(在大型数据集中,这将需要永远)。

谢谢!

我将首先将date_combinations转换为DataFrame(使用并行化或选择,然后删除重复的数据,如果它来自数据集。

这个想法是在你的日期和data表(我们将调用它)之间做一个左连接。

首先,我们想要清理您的data表并删除重复项,因为您不希望这样做(因为这样左连接也会在匹配记录上创建重复项):

val mainTableFiltered = data.select("from", "to", "price").dropDuplicates("from", "to")

然后,我们以left连接方式将fromto上的日期与这个清理过的表连接起来,因此我们不会丢失记录:

dateCombinations.join(mainTableFiltered, Seq("from", "to"), "left")

那么,创建的记录如果不匹配将为空,因此我们将空替换为0:

.withColumn("price", when(col("price").isNull, 0).otherwise(col("price")))

我们最后按fromto排序以得到相同的结果(如您的示例):

.orderBy("from", "to")

完整代码:

val mainTableFiltered = data.select("from", "to", "price").dropDuplicates("from", "to")
dateCombinations.join(mainTableFiltered, Seq("from", "to"), "left")
.withColumn("price", when(col("price").isNull, 0).otherwise(col("price")))
.orderBy("from", "to")

最终输出:

+----------+----------+-----+
|      from|        to|price|
+----------+----------+-----+
|2022-01-01|2022-01-02|    0|
|2022-01-01|2022-01-03|    0|
|2022-01-01|2022-01-04|    0|
|2022-01-01|2022-01-05| 1000|
|2022-01-02|2022-01-03|    0|
|2022-01-02|2022-01-04|  990|
|2022-01-02|2022-01-05| 1100|
|2022-01-03|2022-01-04|    0|
|2022-01-03|2022-01-05|    0|
|2022-01-04|2022-01-05|    0|
+----------+----------+-----+

您可以从您的日期列表中创建一个df并将两个dfs连接起来:

df = spark.createDataFrame(
[
('TLV','NYC','2022-01-01','2022-01-05','1000')
,('TLV','ROM','2022-03-01','2022-04-05','480')
,('TLV','NYC','2022-01-02','2022-01-04','990')
,('TLV','NYC','2022-02-01','2022-03-15','1200')
,('TLV','NYC','2022-01-02','2022-01-05','1100')
,('TLV','BLR','2022-01-01','2022-01-05','1480')
,('TLV','NYC','2022-01-02','2022-01-05','1010')
],
['origin','destination','from','to','price']
)
df.show()
+------+-----------+----------+----------+-----+
|origin|destination|      from|        to|price|
+------+-----------+----------+----------+-----+
|   TLV|        NYC|2022-01-01|2022-01-05| 1000|
|   TLV|        ROM|2022-03-01|2022-04-05|  480|
|   TLV|        NYC|2022-01-02|2022-01-04|  990|
|   TLV|        NYC|2022-02-01|2022-03-15| 1200|
|   TLV|        NYC|2022-01-02|2022-01-05| 1100|
|   TLV|        BLR|2022-01-01|2022-01-05| 1480|
|   TLV|        NYC|2022-01-02|2022-01-05| 1010|
+------+-----------+----------+----------+-----+
date_combinations = [("2022-01-01", "2022-01-02"), ("2022-01-01", "2022-01-03"), ("2022-01-01", "2022-01-04"),("2022-01-01", "2022-01-05"), ("2022-01-02", "2022-01-03"), ("2022-01-02", "2022-01-04"),("2022-01-02", "2022-01-05"), ("2022-01-03", "2022-01-04"), ("2022-01-03", "2022-01-05"), ("2022-01-04", "2022-01-05") ]
df_date_combinations = spark.createDataFrame(date_combinations, ['from','to'])
df
.join(df_date_combinations, ['from','to'])
.show()
+----------+----------+------+-----------+-----+
|      from|        to|origin|destination|price|
+----------+----------+------+-----------+-----+
|2022-01-01|2022-01-05|   TLV|        NYC| 1000|
|2022-01-01|2022-01-05|   TLV|        BLR| 1480|
|2022-01-02|2022-01-04|   TLV|        NYC|  990|
|2022-01-02|2022-01-05|   TLV|        NYC| 1100|
|2022-01-02|2022-01-05|   TLV|        NYC| 1010|
+----------+----------+------+-----------+-----+

最新更新