使用外部数据转换 DStream RDD



我们正在开发一个Spark流ETL应用程序,它将从Kafka获取数据,应用必要的转换并将数据加载到MongoDB中。从 Kafka 接收的数据采用 JSON 格式。转换基于从MongoDB获取的查找数据应用于RDD的每个元素(JSON String(。由于查找数据发生了变化,我需要为每个批处理间隔获取它。查找数据是使用MongoDB的SqlContext.read读取的。我无法在DStream.transform中使用SqlContext.read,因为SqlContext不可序列化,所以我无法将其广播到工作节点。现在我尝试使用DStream.foreachRDD,在其中我从MongoDB获取数据并将查找数据广播给worker。RDD 元素上的所有转换都在 rdd.map 闭包内执行,该闭包利用广播数据并执行转换并返回 RDD。然后将RDD转换为数据帧并写入MongoDB。目前,此应用程序运行速度非常慢。

PS:如果我将获取查找数据的代码部分移出DStream.foreachRDD,并添加DStream.transform来应用转换,并且让DStream.foreachRDD仅将数据插入MongoDB,性能非常好。但是,使用此方法时,不会为每个批处理间隔更新查找数据。

我正在寻求帮助以了解这是否是一种好方法,并且我正在寻找一些指导来提高性能。

以下是伪代码

package com.testing

object Pseudo_Code {
def main(args: Array[String]) {
val sparkConf = new SparkConf().setAppName("Pseudo_Code")
.setMaster("local[4]")
val sc = new SparkContext(sparkConf)
sc.setLogLevel("ERROR")
val sqlContext = new SQLContext(sc)
val ssc = new StreamingContext(sc, Seconds(1))
val mongoIP = "127.0.0.1:27017"
val DBConnectionURI = "mongodb://" + mongoIP + "/" + "DBName"
val bootstrap_server_config = "127.0.0.100:9092"
val zkQuorum = "127.0.0.101:2181"
val group = "streaming"
val TopicMap = Map("sampleTopic" -> 1)

val KafkaDStream = KafkaUtils.createStream(ssc, zkQuorum,  group,  TopicMap).map(_._2)
KafkaDStream.foreachRDD{rdd => 
if (rdd.count() > 0) {
//This lookup data has information required to perform transformation
//This information keeps changing, so the data should be fetched for every batch interval
val lookup1 = sqlContext.read.format("com.mongodb.spark.sql.DefaultSource")
.option("spark.mongodb.input.uri", DBConnectionURI)
.option("spark.mongodb.input.collection", "lookupCollection1")
.load()
val broadcastLkp1 = sc.broadcast(lookup1)
val new_rdd = rdd.map{elem => 
val json: JValue = parse(elem)
//Foreach element in rdd, there are some values that should be looked up from the broadcasted lookup data
//"value" extracted from json
val param1 = broadcastLkp1.value.filter(broadcastLkp1.value("key")==="value").select("param1").distinct()
val param1ReplaceNull = if(param1.count() == 0){
"constant"
}
else{
param1.head().getString(0)
}
//create a new JSON with a different structure
val new_JSON = """"""
compact(render(new_JSON))
}
val targetSchema = new StructType(Array(StructField("key1",StringType,true)
,StructField("key2",TimestampType,true)))
val transformedDf = sqlContext.read.schema(targetSchema).json(new_rdd)

transformedDf.write
.option("spark.mongodb.output.uri",DBConnectionURI)
.option("collection", "tagetCollectionName")
.mode("append").format("com.mongodb.spark.sql").save()
}
}
// Run the streaming job
ssc.start()
ssc.awaitTermination()
}


}

经过研究,有效的解决方案是在工作人员读取广播数据帧后对其进行缓存。以下是我为提高性能而必须执行的代码更改。

val new_rdd = rdd.map{elem => 
val json: JValue = parse(elem)
//Foreach element in rdd, there are some values that should be looked up from the broadcasted lookup data
//"value" extracted from json
val lkp_bd = broadcastLkp1.value
lkp_bd.cache()
val param1 = lkp_bd.filter(broadcastLkp1.value("key")==="value").select("param1").distinct()
val param1ReplaceNull = if(param1.count() == 0){
"constant"
}
else{
param1.head().getString(0)
}
//create a new JSON with a different structure
val new_JSON = """"""
compact(render(new_JSON))
}

附带说明一下,此方法在群集上运行时存在问题。访问广播数据帧时引发空指针异常。我创建了另一个线程。Spark 流式处理 - 访问广播变量时出现空指针异常

最新更新