火花阅读压缩与特殊格式



我有一个文件。gz我需要阅读这个文件,并将时间和文件名添加到这个文件中,我有一些问题,需要您的帮助来推荐一种方法。

  1. 由于文件被压缩,第一行读取格式不正确,我认为由于编码问题,我尝试了下面的代码,但不工作

    implicit val codec = Codec("UTF-8")
    codec.onMalformedInput(CodingErrorAction.REPLACE)
    codec.onUnmappableCharacter(CodingErrorAction.REPLACE)
    
  2. 文件具有特殊格式,我需要使用Regex读取它到datafame ==>我发现的唯一方法是使用RDD读取它并将其映射到Regex是否有任何方法将其直接读取到DF并传递Regex ?

    val Test_special_format_RawData = sc.textFile("file://"+filename.toString())
      .map(line ⇒ line.replace("||", "|NA|NA"))
      .map(line ⇒ if (line.takeRight(1) == "|") line+"NA" else line)
      .map { x ⇒ regex_var.findAllIn(x).toArray }
    import hiveSqlContext.implicits._
    val Test_special_format_DF = Test_special_format_RawData.filter { x⇒x.length==30 }
      .filter { x⇒x(0) !=header(0) }
      .map { x⇒ (x(0), x(1), x(2), x(3), x(4), x(5), x(6), x(7),
                 x(8), x(9), x(10), x(11), x(12), x(13), x(14),
                 x(15),x(16), x(17), x(18), x(19))}.toDF()
    val Test_special_format_Tranformed_Data = Test_special_format_DF.withColumn("FileName", lit(filename.getName))
      .withColumn("rtm_insertion_date", lit(RTM_DATE_FORMAT.format(Cal.getInstance().getTime())))
    
  3. 我可以忽略任何特殊字符之间的任何分隔符,例如,如果"|"管道出现在^~ ^~之间忽略它?

  4. 有时数据帧列类型接收错误的数据类型。我们如何处理这个问题来应用数据质量检查?

  5. 当我试图使用Dataframe从Spark插入到hive时。我可以为下面的代码中使用的"取消处理行"错误指定拒绝目录吗?

    Test_special_format_Tranformed_Data.write.partitionBy("rtm_insertion_date")
      .mode(SaveMode.Append).insertInto("dpi_Test_special_format_source")
    

文件示例在这里

我将回答我关于文件格式的问题。解决方案是覆盖gzib的默认扩展格式。

import org.apache.hadoop.io.compress.GzipCodec
class TmpGzipCodec extends GzipCodec {
  override def getDefaultExtension(): String = ".gz.tmp"
}

现在我们刚刚注册了这个编解码器,在SparkConf:

中设置spark.hadoop.io.compression.codecs
val conf = new SparkConf()
// Custom Codec that process .gz.tmp extensions as a common Gzip format
conf.set("spark.hadoop.io.compression.codecs", "smx.ananke.spark.util.codecs.TmpGzipCodec")
val sc = new SparkContext(conf)
val data = sc.textFile("s3n://my-data-bucket/2015/09/21/13/*")

我发现这个解决方案是这个链接

对于畸形记录,有以下两种解决方案:

  1. Case作为Case类,然后检查它的模式是否与这个Case类匹配。
  2. 逐行解析RDD,但它需要在spark.csv库中更新。

关于分隔符的问题,需要使用RDD与regex。

相关内容

  • 没有找到相关文章

最新更新