如何使用 Column 与条件一起为 Scala / Spark 数据框中的每一行



我有以下格式的数据框

+-----------------+-------------------------+-----------+-----------------------------+---------------------------+----------------------------------+--------------------------------------+--------------------------------+
|DataPartition    |TimeStamp                |FFAction|!||IdentifierValue_effectiveFrom|IdentifierValue_effectiveTo|IdentifierValue_identifierEntityId|IdentifierValue_identifierEntityTypeId|IdentifierValue_identifierTypeId|
+-----------------+-------------------------+-----------+-----------------------------+---------------------------+----------------------------------+--------------------------------------+--------------------------------+
|SelfSourcedPublic|2018-03-05T11:54:18+00:00|I|!|       |1900-01-01T00:00:00+00:00    |9999-12-31T00:00:00+00:00  |4295903126                        |404010                                |320150                          |
+-----------------+-------------------------+-----------+-----------------------------+---------------------------+----------------------------------+--------------------------------------+--------------------------------+

我想在下面列中添加带有条件的额外列

IdentifierValue_identifierEntityTypeId

添加具有以下条件的额外列分区

如果IdentifierValue_identifierEntityTypeId =1001371402 然后分区 =Repno2FundamentalSeries 否则如果IdentifierValue_identifierEntityTypeId404010则分区= Repno2组织

这就是我正在努力实现的目标

 val temp = temp1.withColumn("Partition", when($"IdentifierValue_identifierEntityTypeId" === "404010", 0).otherwise("Repno2FundamentalSeries"))
    temp.show(false)

我得到的输出低于,但值为零

+-----------------+-------------------------+-----------+-----------------------------+---------------------------+----------------------------------+--------------------------------------+--------------------------------+---------+
|DataPartition    |TimeStamp                |FFAction|!||IdentifierValue_effectiveFrom|IdentifierValue_effectiveTo|IdentifierValue_identifierEntityId|IdentifierValue_identifierEntityTypeId|IdentifierValue_identifierTypeId|Partition|
+-----------------+-------------------------+-----------+-----------------------------+---------------------------+----------------------------------+--------------------------------------+--------------------------------+---------+
|SelfSourcedPublic|2018-03-05T11:54:18+00:00|I|!|       |1900-01-01T00:00:00+00:00    |9999-12-31T00:00:00+00:00  |4295903126                        |404010                                |320150                          |0        |
+-----------------+-------------------------+-----------+-----------------------------+---------------------------+----------------------------------+--------------------------------------+--------------------------------+---------+

我是斯卡拉的新手,所以基本问题

对于列上的多个条件,如何写何时和否则.这对我不起作用 得到这样的错误

线程"main"中的异常 java.lang.IllegalArgumentException: 否则(( 只能在先前生成的列上应用一次 按时间((

val dataMain = dataMain1.withColumn(
      "Partition",
      when($"RelationObjectId_relatedObjectType" === "EDInstrument" && $"RelationObjectId_relatedObjectType" === "Fundamental", "Instrument2Fundamental")
        .otherwise(when($"RelationObjectId_relatedObjectType" === "EDInstrument" && $"RelationObjectId_relatedObjectType" === "FundamentalSeries", "Instrument2FundamentalSeries"))
        .otherwise(when($"RelationObjectId_relatedObjectType" === "Organization" && $"RelationObjectId_relatedObjectType" === "Fundamental", "Organization2Fundamental"))
        .otherwise(when($"RelationObjectId_relatedObjectType" === "Organization" && $"RelationObjectId_relatedObjectType" === "FundamentalSeries", "Organization2FundamentalSeries"))
        )

根据您提供的条件,您应该更改 when 条件,如下所示。

如果IdentifierValue_identifierEntityTypeId =1001371402则分区 =Repno2FundamentalSeries else 如果IdentifierValue_identifierEntityTypeId404010则分区= Repno2组织

df1.withColumn("Partition",
  when($"IdentifierValue_identifierEntityTypeId" === "1001371402", "Repno2FundamentalSeries")
    .otherwise("Repno2Organization")
)

输出:

+-----------------+-------------------------+-----------+-----------------------------+---------------------------+----------------------------------+--------------------------------------+--------------------------------+-----------------------+
|DataPartition    |TimeStamp                |FFAction|!||IdentifierValue_effectiveFrom|IdentifierValue_effectiveTo|IdentifierValue_identifierEntityId|IdentifierValue_identifierEntityTypeId|IdentifierValue_identifierTypeId|Partition              |
+-----------------+-------------------------+-----------+-----------------------------+---------------------------+----------------------------------+--------------------------------------+--------------------------------+-----------------------+
|SelfSourcedPublic|2018-03-05T11:54:18+00:00|I||!       |1900-01-01T00:00:00+00:00    |9999-12-31T00:00:00+00:00  |4295903126                        |404010                                |320150                          |Repno2FundamentalSeries|
+-----------------+-------------------------+-----------+-----------------------------+---------------------------+----------------------------------+--------------------------------------+--------------------------------+-----------------------+

编辑:

以下是编写嵌套When的方法

val dataMain = df.withColumn(
"Partition",
when(($"RelationObjectId_relatedObjectType" === "EDInstrument" && $"RelationObjectId_relatedObjectType" === "Fundamental"), "Instrument2Fundamental")
  .otherwise(
    when($"RelationObjectId_relatedObjectType" === "EDInstrument" && $"RelationObjectId_relatedObjectType" === "FundamentalSeries", "Instrument2FundamentalSeries")
      .otherwise(
        when($"RelationObjectId_relatedObjectType" === "Organization" && $"RelationObjectId_relatedObjectType" === "Fundamental", "Organization2Fundamental")
          .otherwise(when($"RelationObjectId_relatedObjectType" === "Organization" && $"RelationObjectId_relatedObjectType" === "FundamentalSeries", "Organization2FundamentalSeries")
          )
      )
  )

(

希望这有帮助

实现这一点的一种替代方法是;你可以使用像 CASE WHEN 语句这样的 sql 而不是使用 WithColumn

如果您熟悉 sql,这可能更容易编码

例如。

       val dataMain = dataMain1.selectExpr("*", 
       """CASE WHEN RelationObjectId_relatedObjectType = 'EDInstrument' 
       THEN 'Instrument2Fundamental'
       WHEN cond2 
       THEN value2
       ELSE defaultValue end AS partition""")

最新更新