获取最接近的汇率



我想从第二个数据帧加入两个基于汇率交换和日期的数据帧。我尝试过这里引用的方法,但datediff给出了日期的差异,所以它没有给我正确的费率。

df1:

日期值_转换20002017-03-272017-01-1211
from_curr to _curr
AED 欧元 2017-03-24
AED 欧元189
DZD 欧元130
EUR EUR 2020-01-01

您可以聚合较小的数据帧(df2(,以便将所有日期和费率收集到一个单元格中。然后,加入数据帧,取出您需要的内容并进行分割。

输入:

from pyspark.sql import functions as F
df1 = spark.createDataFrame(     
[('AED', 'EUR', '2017-03-24', 2000),
('AED', 'EUR', '2017-03-27', 189),
('DZD', 'EUR', '2017-01-12', 130),
('EUR', 'EUR', '2020-01-01', 11)],
['from_curr', 'to_curr', 'Date', 'value_to_convert'])
df2 = spark.createDataFrame(
[('AED', 'EUR', '2017-03-24', -5.123),
('AED', 'EUR', '2017-03-26', -9.5),
('DZD', 'EUR', '2017-01-01', -6.12)],
['transacti', 'local', 'DateTra', 'rate_exchange'])

获得最接近一天费率的脚本(可能来自未来(:

df2 = df2.groupBy('transacti', 'local').agg(
F.collect_list(F.struct('DateTra', 'rate_exchange')).alias('_vals')
)
rate = F.array_sort(F.transform(
'_vals',
lambda x: F.struct(
F.abs(F.datediff('Date', x.DateTra)).alias('diff'),
-F.unix_timestamp(x.DateTra, 'yyyy-MM-dd').alias('DateTra'),
F.abs(x.rate_exchange).alias('rate_exchange')
)
))[0]['rate_exchange']
df = (df1
.join(df2, (df1.from_curr == df2.transacti) & (df1.to_curr == df2.local), 'left')
.select(
df1['*'],
F.coalesce(
F.col('value_to_convert') / rate,
F.when(df1.from_curr == df1.to_curr, df1.value_to_convert)
).alias('value_converted')
)
)
df.show()
# +---------+-------+----------+----------------+------------------+
# |from_curr|to_curr|      Date|value_to_convert|   value_converted|
# +---------+-------+----------+----------------+------------------+
# |      AED|    EUR|2017-03-24|            2000| 390.3962521959789|
# |      AED|    EUR|2017-03-27|             189|19.894736842105264|
# |      EUR|    EUR|2020-01-01|              11|              11.0|
# |      DZD|    EUR|2017-01-12|             130|21.241830065359476|
# +---------+-------+----------+----------------+------------------+

获得最新速率但不是来自未来的脚本:

df2 = df2.groupBy('transacti', 'local').agg(
F.sort_array(F.collect_list(F.struct('DateTra', 'rate_exchange')), False).alias('_vals')
)
rate = F.abs(F.filter('_vals', lambda x: x.DateTra <= F.col('Date'))[0]['rate_exchange'])
df = (df1
.join(df2, (df1.from_curr == df2.transacti) & (df1.to_curr == df2.local), 'left')
.select(
df1['*'],
F.coalesce(
F.col('value_to_convert') / rate,
F.when(df1.from_curr == df1.to_curr, df1.value_to_convert)
).alias('value_converted')
)
)
df.show()
# +---------+-------+----------+----------------+------------------+
# |from_curr|to_curr|      Date|value_to_convert|   value_converted|
# +---------+-------+----------+----------------+------------------+
# |      AED|    EUR|2017-03-24|            2000| 390.3962521959789|
# |      AED|    EUR|2017-03-27|             189|19.894736842105264|
# |      EUR|    EUR|2020-01-01|              11|              11.0|
# |      DZD|    EUR|2017-01-12|             130|21.241830065359476|
# +---------+-------+----------+----------------+------------------+

最新更新