我想从第二个数据帧加入两个基于汇率交换和日期的数据帧。我尝试过这里引用的方法,但datediff
给出了日期的差异,所以它没有给我正确的费率。
df1:
from_curr | to _curr | 日期值_转换||
---|---|---|---|
AED | 欧元 | 2017-03-24 | 2000|
AED | 欧元 | 2017-03-27189 | |
DZD | 欧元 | 2017-01-12130 | |
EUR | EUR | 2020-01-01 | 11
您可以聚合较小的数据帧(df2(,以便将所有日期和费率收集到一个单元格中。然后,加入数据帧,取出您需要的内容并进行分割。
输入:
from pyspark.sql import functions as F
df1 = spark.createDataFrame(
[('AED', 'EUR', '2017-03-24', 2000),
('AED', 'EUR', '2017-03-27', 189),
('DZD', 'EUR', '2017-01-12', 130),
('EUR', 'EUR', '2020-01-01', 11)],
['from_curr', 'to_curr', 'Date', 'value_to_convert'])
df2 = spark.createDataFrame(
[('AED', 'EUR', '2017-03-24', -5.123),
('AED', 'EUR', '2017-03-26', -9.5),
('DZD', 'EUR', '2017-01-01', -6.12)],
['transacti', 'local', 'DateTra', 'rate_exchange'])
获得最接近一天费率的脚本(可能来自未来(:
df2 = df2.groupBy('transacti', 'local').agg(
F.collect_list(F.struct('DateTra', 'rate_exchange')).alias('_vals')
)
rate = F.array_sort(F.transform(
'_vals',
lambda x: F.struct(
F.abs(F.datediff('Date', x.DateTra)).alias('diff'),
-F.unix_timestamp(x.DateTra, 'yyyy-MM-dd').alias('DateTra'),
F.abs(x.rate_exchange).alias('rate_exchange')
)
))[0]['rate_exchange']
df = (df1
.join(df2, (df1.from_curr == df2.transacti) & (df1.to_curr == df2.local), 'left')
.select(
df1['*'],
F.coalesce(
F.col('value_to_convert') / rate,
F.when(df1.from_curr == df1.to_curr, df1.value_to_convert)
).alias('value_converted')
)
)
df.show()
# +---------+-------+----------+----------------+------------------+
# |from_curr|to_curr| Date|value_to_convert| value_converted|
# +---------+-------+----------+----------------+------------------+
# | AED| EUR|2017-03-24| 2000| 390.3962521959789|
# | AED| EUR|2017-03-27| 189|19.894736842105264|
# | EUR| EUR|2020-01-01| 11| 11.0|
# | DZD| EUR|2017-01-12| 130|21.241830065359476|
# +---------+-------+----------+----------------+------------------+
获得最新速率但不是来自未来的脚本:
df2 = df2.groupBy('transacti', 'local').agg(
F.sort_array(F.collect_list(F.struct('DateTra', 'rate_exchange')), False).alias('_vals')
)
rate = F.abs(F.filter('_vals', lambda x: x.DateTra <= F.col('Date'))[0]['rate_exchange'])
df = (df1
.join(df2, (df1.from_curr == df2.transacti) & (df1.to_curr == df2.local), 'left')
.select(
df1['*'],
F.coalesce(
F.col('value_to_convert') / rate,
F.when(df1.from_curr == df1.to_curr, df1.value_to_convert)
).alias('value_converted')
)
)
df.show()
# +---------+-------+----------+----------------+------------------+
# |from_curr|to_curr| Date|value_to_convert| value_converted|
# +---------+-------+----------+----------------+------------------+
# | AED| EUR|2017-03-24| 2000| 390.3962521959789|
# | AED| EUR|2017-03-27| 189|19.894736842105264|
# | EUR| EUR|2020-01-01| 11| 11.0|
# | DZD| EUR|2017-01-12| 130|21.241830065359476|
# +---------+-------+----------+----------------+------------------+