我有一个具有100列以上的日志文件。在其中我只需要两个列" _raw"one_answers" _time",因此我创建了" csv" df的日志文件。
步骤1:
scala> val log = spark.read.format("csv").option("inferSchema", "true").option("header", "true").load("soa_prod_diag_10_jan.csv")
log: org.apache.spark.sql.DataFrame = [ARRAffinity: string, CoordinatorNonSecureURL: string ... 126 more fields]
步骤2:我将DF注册为临时表 log.createOrReplaceTempView("logs"
)
步骤3:我提取了我的两个必需的列" _raw"one_answers" _time"
scala> val sqlDF = spark.sql("select _raw, _time from logs")
sqlDF: org.apache.spark.sql.DataFrame = [_raw: string, _time: string]
scala> sqlDF.show(1, false)
+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----+
|_raw |_time|
+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----+
|[2019-01-10T23:59:59.998-06:00] [xx_yyy_zz_sss_ra10] [ERROR] [OSB-473003] [oracle.osb.statistics.statistics] [tid: [ACTIVE].ExecuteThread: '28' for queue: 'weblogic.kernel.Default (self-tuning)'] [userId: <anonymous>] [ecid: 92b39a8b-8234-4d19-9ac7-4908dc79c5ed-0000bd0b,0] [partition-name: DOMAIN] [tenant-name: GLOBAL] Aggregation Server Not Available. Failed to get remote aggregator[[|null |
+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----+
only showing top 1 row
我的要求:
我需要在" _raw"列中拆分字符串才能产生[2019-01-10T23:59:59.998-06:00] [xx_yyy_zz_ssss_ra10] [错误] [OSB-473003] [ORACLE.OSB.STATISTIS.STATISTIS.STATISTIS.STATISIST使用列名称A,B,C,D,E,F分别为
还从" _raw"one_answers" _time"
中删除所有null值您的答案会很感激:)
您可以拆分功能,然后按空格将_raw拆分。这将返回一个数组,然后您可以从该数组中提取值。您还可以使用RegexP_Extract函数从日志消息提取值。两种方式如下所示。我希望这很有帮助。
//Creating Test Data
val df = Seq("[2019-01-10T23:59:59.998-06:00] [xx_yyy_zz_sss_ra10] [ERROR] [OSB-473003] [oracle.osb.statistics.statistics] [tid: [ACTIVE].ExecuteThread: '28' for queue: 'weblogic.kernel.Default (self-tuning)'] [userId: <anonymous>] [ecid: 92b39a8b-8234-4d19-9ac7-4908dc79c5ed-0000bd0b,0] [partition-name: DOMAIN] [tenant-name: GLOBAL] Aggregation Server Not Available. Failed to get remote aggregator[[")
.toDF("_raw")
val splitDF = df.withColumn("split_raw_arr", split($"_raw", " "))
.withColumn("A", $"split_raw_arr"(0))
.withColumn("B", $"split_raw_arr"(1))
.withColumn("C", $"split_raw_arr"(2))
.withColumn("D", $"split_raw_arr"(3))
.withColumn("E", $"split_raw_arr"(4))
.drop("_raw", "split_raw_arr")
splitDF.show(false)
+-------------------------------+--------------------+-------+------------+----------------------------------+
|A |B |C |D |E |
+-------------------------------+--------------------+-------+------------+----------------------------------+
|[2019-01-10T23:59:59.998-06:00]|[xx_yyy_zz_sss_ra10]|[ERROR]|[OSB-473003]|[oracle.osb.statistics.statistics]|
+-------------------------------+--------------------+-------+------------+----------------------------------+
val extractedDF = df
.withColumn("a", regexp_extract($"_raw", "\[(.*?)\]",1))
.withColumn("b", regexp_extract($"_raw", "\[(.*?)\] \[(.*?)\]",2))
.withColumn("c", regexp_extract($"_raw", "\[(.*?)\] \[(.*?)\] \[(.*?)\]",3))
.withColumn("d", regexp_extract($"_raw", "\[(.*?)\] \[(.*?)\] \[(.*?)\] \[(.*?)\]",4))
.withColumn("e", regexp_extract($"_raw", "\[(.*?)\] \[(.*?)\] \[(.*?)\] \[(.*?)\] \[(.*?)\]",5))
.withColumn("f", regexp_extract($"_raw", "(?<=ecid: )(.*?)(?=,)",1))
.drop("_raw")
+-----------------------------+------------------+-----+----------+--------------------------------+---------------------------------------------+
|a |b |c |d |e |f |
+-----------------------------+------------------+-----+----------+--------------------------------+---------------------------------------------+
|2019-01-10T23:59:59.998-06:00|xx_yyy_zz_sss_ra10|ERROR|OSB-473003|oracle.osb.statistics.statistics|92b39a8b-8234-4d19-9ac7-4908dc79c5ed-0000bd0b|
+-----------------------------+------------------+-----+----------+--------------------------------+---------------------------------------------+