基于Spark/Scala动态值的数据框过滤



我有一个json格式:

{"Request": {"TrancheList": {"Tranche": [{"TrancheId": "500192163","OwnedAmt": "26500000",    "Curr": "USD" }, {  "TrancheId": "500213369", "OwnedAmt": "41000000","Curr": "USD"}]},"FxRatesList": {"FxRatesContract": [{"Currency": "CHF","FxRate": "0.97919983706115"},{"Currency": "AUD", "FxRate": "1.2966804979253"},{ "Currency": "USD","FxRate": "1"},{"Currency": "SEK","FxRate": "8.1561012531034"},{"Currency": "NOK", "FxRate": "8.2454981641398"},{"Currency": "JPY","FxRate": "111.79999785344"},{"Currency": "HKD","FxRate": "7.7568025218916"},{"Currency": "GBP","FxRate": "0.69425159677867"}, {"Currency": "EUR","FxRate": "0.88991723769689"},{"Currency": "DKK", "FxRate": "6.629598372301"}]},"isExcludeDeals": "true","baseCurrency": "USD"}}

我试图获得一个等于basecurcy标签的货币的汇率值

我正在从hdfs集群读取json

 val hdfsRequest = spark.read.json("localhost/user/request.json")
val baseCurrency = hdfsRequest.select("Request.baseCurrency")
var fxRates = hdfsRequest.select("Request.FxRatesList.FxRatesContract")
val fxRatesDF = fxRates.select(explode(fxRates("FxRatesContract"))).toDF("FxRatesContract").select("FxRatesContract.Currency", "FxRatesContract.FxRate").filter($"Currency=baseCurrency")
我运行这行代码得到的错误是:
org.apache.spark.sql.AnalysisException: cannot resolve '`Currency=baseCurrency`' given input columns: [Currency, FxRate];

我如何在Scala/Spark的数据框架的过滤器表达式中指定变量baseCurrency ?

谢谢

如果基础货币只是一个单一的值,那么你可以做的是:

val hdfsRequest = spark.read.json("localhost/user/request.json")
val baseCurrency = hdfsRequest.select("Request.baseCurrency")
  .map(_.getString(0)).collect.headOption
var fxRates = hdfsRequest.select("Request.FxRatesList.FxRatesContract")
val fxRatesDF = fxRates.select(explode(fxRates("FxRatesContract")))
  .toDF("FxRatesContract")
  .select("FxRatesContract.Currency", "FxRatesContract.FxRate")
  .filter($"Currency"===baseCurrency.fold(-1D)(identity))

相关内容

  • 没有找到相关文章