如何使用columnrenrensuns以所有列编写,并在Spark Data框架中的自定义分区中加入两个不同的模式



嗨,我有两个文本文件,我必须加入两个文本文件才能创建唯一的文件。我已经使用了Spark中的数据框来实现这一目标。

两个文本文件都具有相同的结构,除某些字段。

现在我必须创建数据框并加入两个数据框架。

问题1:我们如何加入具有一些额外字段的两个数据框架。例如,我的模式首先作为时间戳提交,但我的第一个数据框架没有时间戳字段。

问题2:在我的代码中,我必须重命名所有列才能在加入后选择列,我有29列,所以我必须编写重命名函数29次。有什么办法可以做到这一点,而无需写很多次。

问题3:加入后,我必须根据提交的一些保存输出。例如,如果语句typecode为bal,则所有属于BAL的记录都将转到一个文件,与MAP REDY中的自定义分区相同。

这是我尝试过的latestForEachKey.write.partitionBy("StatementTypeCode"),我希望它应该是正确的。

我知道我在一篇文章中问了很多问题。我正在学习Spark Scala,因此在每个语法和每个概念中都面临问题。我希望我的问题很清楚。

这是我现在正在做的代码。

    val sqlContext = new org.apache.spark.sql.SQLContext(sc)
        import sqlContext.implicits._
        import org.apache.spark.{ SparkConf, SparkContext }
        import java.sql.{Date, Timestamp}
        import org.apache.spark.sql.Row
        import org.apache.spark.sql.types.{ StructType, StructField, StringType, DoubleType, IntegerType,TimestampType }
        import org.apache.spark.sql.functions.udf
       val schema = StructType(Array(
    StructField("TimeStamp", StringType),
    StructField("LineItem_organizationId", StringType),
    StructField("LineItem_lineItemId", StringType),
    StructField("StatementTypeCode", StringType),
    StructField("LineItemName", StringType),
    StructField("LocalLanguageLabel", StringType),
    StructField("FinancialConceptLocal", StringType),
    StructField("FinancialConceptGlobal", StringType),
    StructField("IsDimensional", StringType),
    StructField("InstrumentId", StringType),
    StructField("LineItemLineItemName", StringType),
    StructField("PhysicalMeasureId", StringType),
    StructField("FinancialConceptCodeGlobalSecondary", StringType),
    StructField("IsRangeAllowed", StringType),
    StructField("IsSegmentedByOrigin", StringType),
    StructField("SegmentGroupDescription", StringType),
    StructField("SegmentChildDescription", StringType),
    StructField("SegmentChildLocalLanguageLabel", StringType),
    StructField("LocalLanguageLabel_languageId", StringType),
    StructField("LineItemName_languageId", StringType),
    StructField("SegmentChildDescription_languageId", StringType),
    StructField("SegmentChildLocalLanguageLabel_languageId", StringType),
    StructField("SegmentGroupDescription_languageId", StringType),
    StructField("SegmentMultipleFundbDescription", StringType),
    StructField("SegmentMultipleFundbDescription_languageId", StringType),
    StructField("IsCredit", StringType),
    StructField("FinancialConceptLocalId", StringType),
    StructField("FinancialConceptGlobalId", StringType),
    StructField("FinancialConceptCodeGlobalSecondaryId", StringType),
    StructField("FFFFAction", StringType)))

       val textRdd1 = sc.textFile("s3://trfsdisu/SPARK/Main.txt")
        val rowRdd1 = textRdd1.map(line => Row.fromSeq(line.split("\|\^\|", -1)))
        var df1 = sqlContext.createDataFrame(rowRdd1, schema).drop("index")
        val textRdd2 = sc.textFile("s3://trfsdisu/SPARK/Incr.txt")
        val rowRdd2 = textRdd2.map(line => Row.fromSeq(line.split("\|\^\|", -1)))
        var df2 = sqlContext.createDataFrame(rowRdd2, schema)
        // df2.show(false) 
        import org.apache.spark.sql.expressions._
        val windowSpec = Window.partitionBy("LineItem_organizationId", "LineItem_lineItemId").orderBy($"TimeStamp".cast(TimestampType).desc) 
        val latestForEachKey = df2.withColumn("rank", rank().over(windowSpec)).filter($"rank" === 1).drop("rank", "TimeStamp")
        .withColumnRenamed("StatementTypeCode", "StatementTypeCode_1").withColumnRenamed("LineItemName", "LineItemName_1").withColumnRenamed("FFAction", "FFAction_1")
  //This is where i need help withColumnRenamed part 

    val df3 = df1.join(latestForEachKey, Seq("LineItem_organizationId", "LineItem_lineItemId"), "outer")
          .select($"LineItem_organizationId", $"LineItem_lineItemId",
            when($"StatementTypeCode_1".isNotNull, $"StatementTypeCode_1").otherwise($"StatementTypeCode").as("StatementTypeCode"),
            when($"LineItemName_1".isNotNull, $"LineItemName_1").otherwise($"LineItemName").as("LineItemName"),
            when($"FFAction_1".isNotNull, $"FFAction_1").otherwise($"FFAction").as("FFAction")).filter(!$"FFAction".contains("D"))
        df3.show()

架构部分可以像这样

来解决
val df1 = sqlContext.createDataFrame(rowRdd1, new StructType(schema.tail.toArray))

相关内容

  • 没有找到相关文章

最新更新