使用 Spark scala 根据行值(示例文件中的标头记录)从单个文件创建多个 RDD



我正在尝试创建多个RDD以基于相似的数据格式独立于以下文件进行处理。

请找到具有不同数据格式的文件

custid,starttime,rpdid,catry,auapp,sppp,retatype,status,process,fileavil
4fgdfg,00:56:30.034,BM_-unit1,GEN,TRUE,FALSE,NONE,A,45,TRUE
X95GEK,00:56:32.083,CBM_OMDD_RSVCM0CBM-unit0,GEN,TRUE,FALSE,NONE,A,GWC,TRUE
XWZ08K,00:57:01.947,GWC-0-UNIT-1,GEN,TRUE,FALSE,NONE,A,GWC,TRUE
custid,relstatus
fg3-03,R
dfsdf4-01,V
56fbfg,R
devid,reg,hold,devbrn,lname,lcon
CTUTANCM0CBM,TRUE,FALSE,13:17:36.934,CBM_BMI_25_5_2,13:43:21.370

在上面的文件中,我们存在三种不同类型的数据格式,我想根据格式将文件拆分为三种不同的RDD。

你能建议如何使用Spark(Scala(实现吗?

您的文件看起来像包含 3 个不同的 csv 文件。

您可以将其作为单个文件读取,并根据每行中的字段数从中提取 3 个 RDD。

// Caching because you'll be filtering it thrice
val topRdd = sc.textFile("file").cache
topRdd.count
//res0: Long = 10
val rdd1 = topRdd.filter(_.split(",", -1).length == 10 )
val rdd2 = topRdd.filter(_.split(",", -1).length ==  2 )
val rdd3 = topRdd.filter(_.split(",", -1).length ==  6 )
rdd1.collect.foreach(println)
// custid,starttime,rpdid,catry,auapp,sppp,retatype,status,process,fileavil
// 4fgdfg,00:56:30.034,BM_-unit1,GEN,TRUE,FALSE,NONE,A,45,TRUE
// X95GEK,00:56:32.083,CBM_OMDD_RSVCM0CBM-unit0,GEN,TRUE,FALSE,NONE,A,GWC,TRUE
// XWZ08K,00:57:01.947,GWC-0-UNIT-1,GEN,TRUE,FALSE,NONE,A,GWC,TRUE
rdd2.collect.foreach(println)
// custid,relstatus
// fg3-03,R
// dfsdf4-01,V
// 56fbfg,R
rdd3.collect.foreach(println)
// devid,reg,hold,devbrn,lname,lcon
// CTUTANCM0CBM,TRUE,FALSE,13:17:36.934,CBM_BMI_25_5_2,13:43:21.370

最新更新