如何将一列与 avro 文件中的其他列映射?



我正在使用Spark 2.1.1和Scala 2.11.8

这个问题是我之前的一个问题的延伸:

如何识别 csv 文件中的空字段?

变化是,我现在不是从 CSV 文件中读取数据,而是从 avro 文件中读取数据。这是我从中读取数据的 avro 文件的格式:

var ttime: Long = 0;
var eTime: Long = 0;
var tids: String = "";
var tlevel: Integer = 0;
var tboot: Long = 0;
var rNo: Integer = 0;
var varType: String = "";
var uids: List[TRUEntry] = Nil;

我正在一个单独的类中解析 avro 文件。

我必须以与上面发布的链接的接受答案中提到的相同方式将 tids 列与每个 uid 映射,除了这次来自 avro 文件而不是格式良好的 csv 文件。我该怎么做?

这是我尝试使用它的代码:

val avroRow = spark.read.avro(inputString).rdd
val avroParsed = avroRow
.map(x => new TRParser(x))
.map((obj: TRParser) => ((obj.tids, obj.uId ),1))
.reduceByKey(_+_)
.saveAsTextFile(outputString)

在 obj.tids 之后,必须单独映射所有 uids 列,以给出与上述链接的接受答案中提到的相同的最终输出。

这就是我解析 avro 文件解析类中所有 uid 的方式:

this.uids = Nil
row.getAs[Seq[Row]]("uids")
.foreach((objRow: Row) => 
this.uids ::= (new TRUEntry(objRow))
)
this.uids    
.foreach((obj:TRUEntry) => {
uInfo += obj.uId + " , " + obj.initM.toString() + " , "
})   

PS:如果这个问题看起来很愚蠢,我深表歉意,但这是我第一次遇到avro文件

可以通过传递相同的循环处理来完成

this.uids 

在主代码中为 :

val avroParsed = avroRow
.map(x => new TRParser(x))
.map((obj: TRParser) => {
val tId = obj.source.trim
var retVal: String = ""
obj.uids
.foreach((obj: TRUEntry) => {
retVal += tId + "," + obj.uId.trim + ":"
})
retVal.dropRight(1)
})
val flattened = avroParsed
.flatMap(x => x.split(":"))
.map(y => ((y),1))

相关内容

最新更新