如果你想在连接期间使用int比较而不是日期(这内部取决于你的集群版本,在Spark 2中。在Spark 3中,X日期作为字符串进行比较。(如果你不设置遗留参数,它们可能会被转换为datetime),并避免非equi join,它将解析为broadcastNestedLoopsJoin,你可以这样考虑:

import datetime 
import pyspark.sql.functions as F
x = [
("Month with one digit", datetime.date(2000, 1, 14)),
("Last day of month", datetime.date(1800, 5, 31)),
("First day of month", datetime.date(1700, 8, 1)),
("John", datetime.date(1932, 11, 14)),
("Maike", datetime.date(1932, 10, 14)),
df = spark.createDataFrame(x, schema=["name", "dob"])
array = []
for x in range(1232):
if x <= 100:
elif x <= 120:
array.append((x, 'Cap'))
elif x > 120 and x <= 131 or x > 200 and x <= 218:
array.append((x, 'Aqu'))
elif x > 218 and x <= 229 or x > 300 and x <= 320:
array.append((x, 'Pis'))
elif x > 320 and x <= 331 or x > 400 and x <= 420:
array.append((x, 'Ari'))
elif x > 420 and x <= 430 or x > 500 and x <= 521:
array.append((x, 'Tau'))
elif x > 521 and x <= 531 or x > 600 and x <= 621:
array.append((x, 'Gem'))
elif x > 621 and x <= 630 or x > 700 and x <= 722:
array.append((x, 'Can'))
elif x > 722 and x <= 731 or x > 800 and x <= 823:
array.append((x, 'Leo'))
elif x > 823 and x <= 831 or x > 900 and x <= 923:
array.append((x, 'Vir'))
elif x > 923 and x <= 930 or x > 1000 and x <= 1023:
array.append((x, 'Lib'))
elif x > 1023 and x <= 1031 or x > 1100 and x <= 1122:
array.append((x, 'Sco'))
elif x > 1122 and x <= 1130 or x > 1200 and x <= 1222:
array.append((x, 'Sag'))
elif x > 1222 and x <= 1231:
array.append((x, 'Cap'))
zodiacsDf =  spark.createDataFrame(array, schema=["date", "name"])
df = df.withColumn("dobWithoutYearAsInt", F.regexp_replace(F.substring_index(F.col("dob"), "-" , -2), "-", ""))
joinedDf = df.join(F.broadcast(zodiacsDf), df["dobWithoutYearAsInt"] == zodiacsDf["date"], "left")
joinedDf.select(df.name, df.dob, zodiacsDf.name).show()





|                name|       dob|name|
|Month with one digit|2000-01-14| Cap|
|   Last day of month|1800-05-31| Gem|
|  First day of month|1700-08-01| Leo|
|                John|1932-11-14| Sco|
|               Maike|1932-10-14| Lib|


== Physical Plan ==
AdaptiveSparkPlan (15)
+- == Final Plan ==
CollectLimit (10)
+- * Project (9)
+- * BroadcastHashJoin LeftOuter BuildRight (8)
:- * LocalLimit (3)
:  +- * Project (2)
:     +- * Scan ExistingRDD (1)
+- BroadcastQueryStage (7), Statistics(sizeInBytes=1032.8 KiB, rowCount=365, isRuntime=true)
+- BroadcastExchange (6)
+- * Filter (5)
+- * Scan ExistingRDD (4)


spark = SparkSession.builder.master("local[*]").getOrCreate()
data1 = [
["John", "1932-11-14"],
["Maike", "1932-10-14"]
data2 = [
["Aries", "03-21", "04-19"],
["Taurus", "04-20", "05-20"],
["Gemini", "05-21", "06-20"],
["Cancer", "06-21", "07-22"],
["Leo star", "07-23", "08-22"],
["Virgo", "08-23", "09-22"],
["Libra", "09-23", "10-22"],
["Scorpio", "10-23", "11-21"],
["Sagittarius", "11-22", "12-21"],
["Capricorn", "12-22", "01-19"],
["Aquarius", "01-20", "02-18"],
["Pisces", "02-19", "03-20"],
df = spark.createDataFrame(data1).toDF("name", "dob")
zodiacSignDf = spark.createDataFrame(data2).toDF("sign", "start", "end")
df.alias("df").join(zodiacSignDf.alias("zodiacSignDf"), to_date(col("df.dob").substr(6, 5), 'MM-dd').between(
to_date(col("zodiacSignDf.start"), 'MM-dd'), to_date(col("zodiacSignDf.end"), 'MM-dd')
), "left").drop("start", "end").show()
| name|       dob|   sign|
| John|1932-11-14|Scorpio|
|Maike|1932-10-14|  Libra|
