如何在PySpark中确定生日的生肖/占星星座



我有以下数据集的名字和出生日期我想创建一个新的列与星座我设法做到这一点在熊猫和python,但我不知道如何继续pyspark,因为我有一个巨大的数据集

例子:

<表类> 名称 捐助 tbody><<tr>约翰1932-11-14迈科1932-10-14

如果你想在连接期间使用int比较而不是日期(这内部取决于你的集群版本,在Spark 2中。在Spark 3中,X日期作为字符串进行比较。(如果你不设置遗留参数,它们可能会被转换为datetime),并避免非equi join,它将解析为broadcastNestedLoopsJoin,你可以这样考虑:

import datetime 
import pyspark.sql.functions as F
x = [
("Month with one digit", datetime.date(2000, 1, 14)),
("Last day of month", datetime.date(1800, 5, 31)),
("First day of month", datetime.date(1700, 8, 1)),
("John", datetime.date(1932, 11, 14)),
("Maike", datetime.date(1932, 10, 14)),
]
df = spark.createDataFrame(x, schema=["name", "dob"])
array = []
for x in range(1232):
if x <= 100:
continue
elif x <= 120:
array.append((x, 'Cap'))
elif x > 120 and x <= 131 or x > 200 and x <= 218:
array.append((x, 'Aqu'))
elif x > 218 and x <= 229 or x > 300 and x <= 320:
array.append((x, 'Pis'))
elif x > 320 and x <= 331 or x > 400 and x <= 420:
array.append((x, 'Ari'))
elif x > 420 and x <= 430 or x > 500 and x <= 521:
array.append((x, 'Tau'))
elif x > 521 and x <= 531 or x > 600 and x <= 621:
array.append((x, 'Gem'))
elif x > 621 and x <= 630 or x > 700 and x <= 722:
array.append((x, 'Can'))
elif x > 722 and x <= 731 or x > 800 and x <= 823:
array.append((x, 'Leo'))
elif x > 823 and x <= 831 or x > 900 and x <= 923:
array.append((x, 'Vir'))
elif x > 923 and x <= 930 or x > 1000 and x <= 1023:
array.append((x, 'Lib'))
elif x > 1023 and x <= 1031 or x > 1100 and x <= 1122:
array.append((x, 'Sco'))
elif x > 1122 and x <= 1130 or x > 1200 and x <= 1222:
array.append((x, 'Sag'))
elif x > 1222 and x <= 1231:
array.append((x, 'Cap'))
zodiacsDf =  spark.createDataFrame(array, schema=["date", "name"])
df = df.withColumn("dobWithoutYearAsInt", F.regexp_replace(F.substring_index(F.col("dob"), "-" , -2), "-", ""))
joinedDf = df.join(F.broadcast(zodiacsDf), df["dobWithoutYearAsInt"] == zodiacsDf["date"], "left")
joinedDf.select(df.name, df.dob, zodiacsDf.name).show()

首先,我正在准备加入的右侧,它的数组与一年中每一天的黄道星座以int格式类似于您在问题中发布的(它涵盖闰年)

然后我添加临时列dobWithoutYearAsInt,这是从左侧表的日期表示为int

然后我正在执行广播连接,它解析为broadcastthastjoin

输出正常

+--------------------+----------+----+
|                name|       dob|name|
+--------------------+----------+----+
|Month with one digit|2000-01-14| Cap|
|   Last day of month|1800-05-31| Gem|
|  First day of month|1700-08-01| Leo|
|                John|1932-11-14| Sco|
|               Maike|1932-10-14| Lib|
+--------------------+----------+----+

计划是这样的:

== Physical Plan ==
AdaptiveSparkPlan (15)
+- == Final Plan ==
CollectLimit (10)
+- * Project (9)
+- * BroadcastHashJoin LeftOuter BuildRight (8)
:- * LocalLimit (3)
:  +- * Project (2)
:     +- * Scan ExistingRDD (1)
+- BroadcastQueryStage (7), Statistics(sizeInBytes=1032.8 KiB, rowCount=365, isRuntime=true)
+- BroadcastExchange (6)
+- * Filter (5)
+- * Scan ExistingRDD (4)

您可以创建一个包含所有生肖及其开始和结束日期的数据框架,然后在两个日期之间连接,代码如下:

spark = SparkSession.builder.master("local[*]").getOrCreate()
data1 = [
["John", "1932-11-14"],
["Maike", "1932-10-14"]
]
data2 = [
["Aries", "03-21", "04-19"],
["Taurus", "04-20", "05-20"],
["Gemini", "05-21", "06-20"],
["Cancer", "06-21", "07-22"],
["Leo star", "07-23", "08-22"],
["Virgo", "08-23", "09-22"],
["Libra", "09-23", "10-22"],
["Scorpio", "10-23", "11-21"],
["Sagittarius", "11-22", "12-21"],
["Capricorn", "12-22", "01-19"],
["Aquarius", "01-20", "02-18"],
["Pisces", "02-19", "03-20"],
]
df = spark.createDataFrame(data1).toDF("name", "dob")
zodiacSignDf = spark.createDataFrame(data2).toDF("sign", "start", "end")
df.alias("df").join(zodiacSignDf.alias("zodiacSignDf"), to_date(col("df.dob").substr(6, 5), 'MM-dd').between(
to_date(col("zodiacSignDf.start"), 'MM-dd'), to_date(col("zodiacSignDf.end"), 'MM-dd')
), "left").drop("start", "end").show()
+-----+----------+-------+
| name|       dob|   sign|
+-----+----------+-------+
| John|1932-11-14|Scorpio|
|Maike|1932-10-14|  Libra|
+-----+----------+-------+

最新更新