如何在 Spark 数据框列中拆分输入文件名和添加特定值



这就是我在火花数据框中加载csv文件的方式

val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext.implicits._
import org.apache.spark.{ SparkConf, SparkContext }
import java.sql.{Date, Timestamp}
import org.apache.spark.sql.Row
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions.udf

val get_cus_val = spark.udf.register("get_cus_val", (filePath: String) => filePath.split("\.")(4))
val df1With_ = df.toDF(df.columns.map(_.replace(".", "_")): _*)
val column_to_keep = df1With_.columns.filter(v => (!v.contains("^") && !v.contains("!") && !v.contains("_c"))).toSeq
val df1result = df1With_.select(column_to_keep.head, column_to_keep.tail: _*)
val df1Final=df1result.withColumn("DataPartition", lit(null: String))

这是我的一个输入文件名的示例。

Fundamental.FinancialLineItem.FinancialLineItem.SelfSourcedPrivate.CUS.1.2017-09-07-1056.Full
Fundamental.FinancialLineItem.FinancialLineItem.Japan.CUS.1.2017-09-07-1056.Full.txt

现在我想读取此文件并用"."运算符将其拆分,然后将CUS添加为新列以代替DataPartition。

我可以在没有任何UDF的情况下做到这一点吗.

这是现有数据框的架构

root
 |-- LineItem_organizationId: long (nullable = true)
 |-- LineItem_lineItemId: integer (nullable = true)
 |-- StatementTypeCode: string (nullable = true)
 |-- LineItemName: string (nullable = true)
 |-- LocalLanguageLabel: string (nullable = true)
 |-- FinancialConceptLocal: string (nullable = true)
 |-- FinancialConceptGlobal: string (nullable = true)
 |-- IsDimensional: boolean (nullable = true)
 |-- InstrumentId: string (nullable = true)
 |-- LineItemSequence: string (nullable = true)
 |-- PhysicalMeasureId: string (nullable = true)
 |-- FinancialConceptCodeGlobalSecondary: string (nullable = true)
 |-- IsRangeAllowed: boolean (nullable = true)
 |-- IsSegmentedByOrigin: boolean (nullable = true)
 |-- SegmentGroupDescription: string (nullable = true)
 |-- SegmentChildDescription: string (nullable = true)
 |-- SegmentChildLocalLanguageLabel: string (nullable = true)
 |-- LocalLanguageLabel_languageId: integer (nullable = true)
 |-- LineItemName_languageId: integer (nullable = true)
 |-- SegmentChildDescription_languageId: integer (nullable = true)
 |-- SegmentChildLocalLanguageLabel_languageId: integer (nullable = true)
 |-- SegmentGroupDescription_languageId: integer (nullable = true)
 |-- SegmentMultipleFundbDescription: string (nullable = true)
 |-- SegmentMultipleFundbDescription_languageId: integer (nullable = true)
 |-- IsCredit: boolean (nullable = true)
 |-- FinancialConceptLocalId: integer (nullable = true)
 |-- FinancialConceptGlobalId: integer (nullable = true)
 |-- FinancialConceptCodeGlobalSecondaryId: string (nullable = true)
 |-- FFAction: string (nullable = true)

在建议的答案后更新代码

    val sqlContext = new org.apache.spark.sql.SQLContext(sc)
    import sqlContext.implicits._
    import org.apache.spark.{ SparkConf, SparkContext }
    import java.sql.{Date, Timestamp}
    import org.apache.spark.sql.Row
    import org.apache.spark.sql.types._
    import org.apache.spark.sql.functions.udf
    import org.apache.spark.sql.functions.{input_file_name, regexp_extract}
spark.udf.register("get_cus_val", (filePath: String) => filePath.split("\.")(4))
import org.apache.spark.sql.functions.input_file_name
val df = sqlContext.read.format("csv").option("header", "true").option("delimiter", "|").option("inferSchema","true").load("s3://trfsdisu/SPARK/FinancialLineItem/MAIN")
val df1With_ = df.toDF(df.columns.map(_.replace(".", "_")): _*)
val column_to_keep = df1With_.columns.filter(v => (!v.contains("^") && !v.contains("!") && !v.contains("_c"))).toSeq
val df1result = df1With_.select(column_to_keep.head, column_to_keep.tail: _*)
df1result.withColumn("cus_val", get_cus_val(input_file_name))
df1result.printSchema()
你可以

用预定义的UDF即input_file_name()获取文件名,之后你可以创建一个UDF来提取CUS或使用regexp_extract wo UDF。

在这里使用 regexp_extract UDF 正则表达式用法

import org.apache.spark.sql.functions.input_file_name
import org.apache.spark.sql.functions.regexp_extract
df.withColumn("cus_val", 
  regexp_extract(input_file_name, ".(w+).[0-9]+.", 1))

使用自定义 UDF

import org.apache.spark.sql.functions.udf
val get_cus_val = udf(filePath: String => filePath.split("\.")(4))
import org.apache.spark.sql.functions.input_file_name
df.withColumn("cus_val", get_cus_val(input_file_name))

相关内容

  • 没有找到相关文章

最新更新