我有一个文件。gz我需要阅读这个文件,并将时间和文件名添加到这个文件中,我有一些问题,需要您的帮助来推荐一种方法。
-
由于文件被压缩,第一行读取格式不正确,我认为由于编码问题,我尝试了下面的代码,但不工作
implicit val codec = Codec("UTF-8") codec.onMalformedInput(CodingErrorAction.REPLACE) codec.onUnmappableCharacter(CodingErrorAction.REPLACE)
-
文件具有特殊格式,我需要使用Regex读取它到datafame ==>我发现的唯一方法是使用RDD读取它并将其映射到Regex是否有任何方法将其直接读取到DF并传递Regex ?
val Test_special_format_RawData = sc.textFile("file://"+filename.toString()) .map(line ⇒ line.replace("||", "|NA|NA")) .map(line ⇒ if (line.takeRight(1) == "|") line+"NA" else line) .map { x ⇒ regex_var.findAllIn(x).toArray } import hiveSqlContext.implicits._ val Test_special_format_DF = Test_special_format_RawData.filter { x⇒x.length==30 } .filter { x⇒x(0) !=header(0) } .map { x⇒ (x(0), x(1), x(2), x(3), x(4), x(5), x(6), x(7), x(8), x(9), x(10), x(11), x(12), x(13), x(14), x(15),x(16), x(17), x(18), x(19))}.toDF() val Test_special_format_Tranformed_Data = Test_special_format_DF.withColumn("FileName", lit(filename.getName)) .withColumn("rtm_insertion_date", lit(RTM_DATE_FORMAT.format(Cal.getInstance().getTime())))
-
我可以忽略任何特殊字符之间的任何分隔符,例如,如果"|"管道出现在^~ ^~之间忽略它?
-
有时数据帧列类型接收错误的数据类型。我们如何处理这个问题来应用数据质量检查?
-
当我试图使用Dataframe从Spark插入到hive时。我可以为下面的代码中使用的"取消处理行"错误指定拒绝目录吗?
Test_special_format_Tranformed_Data.write.partitionBy("rtm_insertion_date") .mode(SaveMode.Append).insertInto("dpi_Test_special_format_source")
文件示例在这里
我将回答我关于文件格式的问题。解决方案是覆盖gzib的默认扩展格式。
import org.apache.hadoop.io.compress.GzipCodec
class TmpGzipCodec extends GzipCodec {
override def getDefaultExtension(): String = ".gz.tmp"
}
现在我们刚刚注册了这个编解码器,在SparkConf:
中设置spark.hadoop.io.compression.codecsval conf = new SparkConf()
// Custom Codec that process .gz.tmp extensions as a common Gzip format
conf.set("spark.hadoop.io.compression.codecs", "smx.ananke.spark.util.codecs.TmpGzipCodec")
val sc = new SparkContext(conf)
val data = sc.textFile("s3n://my-data-bucket/2015/09/21/13/*")
我发现这个解决方案是这个链接
对于畸形记录,有以下两种解决方案:
- Case作为Case类,然后检查它的模式是否与这个Case类匹配。
- 逐行解析RDD,但它需要在spark.csv库中更新。
关于分隔符的问题,需要使用RDD与regex。