我不确定我是否正确理解spark是如何处理数据库连接的,以及如何在spark内部可靠地使用大量数据库更新操作而不会导致spark作业出错。这是我一直在使用的代码片段(为了便于说明):
val driver = new MongoDriver
val hostList: List[String] = conf.getString("mongo.hosts").split(",").toList
val connection = driver.connection(hostList)
val mongodb = connection(conf.getString("mongo.db"))
val dailyInventoryCol = mongodb[BSONCollection](conf.getString("mongo.collections.dailyInventory"))
val stream: InputDStream[(String,String)] = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, (String, String)](
ssc, kafkaParams, fromOffsets,
(mmd: MessageAndMetadata[String, String]) => (mmd.topic, mmd.message()));
def processRDD(rddElem: RDD[(String, String)]): Unit = {
val df = rdd.map(line => {
...
}).flatMap(x => x).toDF()
if (!isEmptyDF(df)) {
var mongoF: Seq[Future[dailyInventoryCol.BatchCommands.FindAndModifyCommand.FindAndModifyResult]] = Seq();
val dfF2 = df.groupBy($"CountryCode", $"Width", $"Height", $"RequestType", $"Timestamp").agg(sum($"Frequency")).collect().map(row => {
val countryCode = row.getString(0); val width = row.getInt(1); val height = row.getInt(2);
val requestType = row.getInt(3); val timestamp = row.getLong(4); val frequency = row.getLong(5);
val endTimestamp = timestamp + 24*60*60; //next day
val updateOp = dailyInventoryCol.updateModifier(BSONDocument("$inc" -> BSONDocument("totalFrequency" -> frequency)), false, true)
val f: Future[dailyInventoryCol.BatchCommands.FindAndModifyCommand.FindAndModifyResult] =
dailyInventoryCol.findAndModify(BSONDocument("width" -> width, "height" -> height, "country_code" -> countryCode, "request_type" -> requestType,
"startTs" -> timestamp, "endTs" -> endTimestamp), updateOp)
f
})
mongoF = mongoF ++ dfF2
//split into small chunk to avoid drying out the mongodb connection
val futureList: List[Seq[Future[dailyInventoryCol.BatchCommands.FindAndModifyCommand.FindAndModifyResult]]] = mongoF.grouped(200).toList
//future list
futureList.foreach(seqF => {
Await.result(Future.sequence(seqF), 40.seconds)
});
}
stream.foreachRDD(processRDD(_))
基本上,我使用的是Reactive Mongo(Scala),对于每个RDD,我都会将其转换为数据帧,对必要的数据进行分组/提取,然后针对Mongo启动大量数据库更新查询。我想问:
我正在使用mesos在3台服务器上部署spark,并为mongo数据库增加了一台服务器。这是处理数据库连接的正确方法吗。我担心的是,数据库连接/轮询是否在spark作业开始时打开,并在整个spark期间(几周、几个月……)正确维护(尽管超时/网络错误故障转移),以及是否会在每个批次完成时关闭?考虑到作业可能安排在不同的服务器上?这是否意味着每批都会打开不同的数据库连接集?
如果在执行查询时发生异常,会发生什么情况。那批的火花作业会失败吗?但下一批会继续吗?
如果有太多的查询(2000->+)无法在mongo数据库上运行更新,并且执行时间超过了配置的火花批处理持续时间(2分钟),会导致问题吗?我注意到,按照我目前的设置,在abt 2-3天后,所有的批都在Spark WebUI上以"进程"的形式排队(如果我禁用了mongo更新部分,那么我可以运行一周而没有问题),没有一个能够正确退出。这基本上会挂起所有批处理作业,直到我重新启动/重新提交作业。
非常感谢。如果你能帮我解决这个问题,我将不胜感激。
请阅读http://spark.apache.org/docs/latest/streaming-programming-guide.html.这将消除您对应如何使用/创建连接的疑虑。
其次,我建议将直接更新操作与Spark Job分开。更好的方法是你的spark作业,处理数据,然后将其发布到Kafka队列中,然后有另一个专用的进程/作业/代码,从Kafka Queue读取数据,并在Mongo DB上执行插入/更新操作。