我正在使用Spark 1.4.0作为独立集群运行的AWS EMR(不是由Yarn或Mesos管理),并且在撰写本文时,我正在使用couchbase-spark-connector版本1.0.0-beta,以及couchbase-java-client版本2.2.0-dp2。
我有一个在Couchbase中编写的视图,它只返回我文档的密钥(大约300M+密钥)。
我用Scala 2.10.4编写了一个Spark程序,对于从视图返回的每个键,我正在获取文档,并希望将其全部保存到AWS S3文件系统中。
我的问题是,Couchbase抛出了一个异常从类型:com.couchbase.client.core。BackpressureException在文档中说:
标识在使用服务时需要从供应方退出,因为消费者过载了。
所以我的问题是我如何修复这个异常不被抛出。我想我会想延迟我的请求,但我不知道该怎么做。
下面是我的Spark代码:
val couchbaseKeys = sparkContext.couchbaseView(ViewQuery.from(couchbaseDesignName, couchbaseViewName)).map(_.id).couchbaseGet[JsonDocument]()
couchbaseKeys.map(Projection.projectCouchbaseObject(_)).filter(_ != null).saveAsTextFile(pathForExportedOutput)
和投影对象及其方法:
object Projection {
val logger: Logger = LoggerFactory.getLogger(this.getClass)
def projectCouchbaseObject(couchbaseObject: JsonDocument): String = {
try {
return couchbaseObject.id() + 't' + couchbaseObject.content()
}
catch {
case exception: Throwable => {
logger.error("Failed project couchbase object. key was: " + couchbaseObject.id(), exception)
}
}
return null
}
}
尝试调整maxRetries和maxDelay配置,这将解决您的问题。
看这里:https://github.com/couchbase/couchbase-spark-connector/blob/33e4b8382ae80e39ae0254489f5c410f10e7d549/src/main/scala/com/couchbase/spark/connection/CouchbaseConfig.scala#L30