错误使用沙发-火花连接器.抛出BackPressureException



我正在使用Spark 1.4.0作为独立集群运行的AWS EMR(不是由Yarn或Mesos管理),并且在撰写本文时,我正在使用couchbase-spark-connector版本1.0.0-beta,以及couchbase-java-client版本2.2.0-dp2

我有一个在Couchbase中编写的视图,它只返回我文档的密钥(大约300M+密钥)。

我用Scala 2.10.4编写了一个Spark程序,对于从视图返回的每个键,我正在获取文档,并希望将其全部保存到AWS S3文件系统中。

我的问题是,Couchbase抛出了一个异常从类型:com.couchbase.client.core。BackpressureException在文档中说:

标识在使用服务时需要从供应方退出,因为消费者过载了。

所以我的问题是我如何修复这个异常不被抛出。我想我会想延迟我的请求,但我不知道该怎么做。

下面是我的Spark代码:

val couchbaseKeys = sparkContext.couchbaseView(ViewQuery.from(couchbaseDesignName, couchbaseViewName)).map(_.id).couchbaseGet[JsonDocument]()
    couchbaseKeys.map(Projection.projectCouchbaseObject(_)).filter(_ != null).saveAsTextFile(pathForExportedOutput)

和投影对象及其方法:

object Projection {
  val logger: Logger = LoggerFactory.getLogger(this.getClass)
  def projectCouchbaseObject(couchbaseObject: JsonDocument): String = {
    try {
      return couchbaseObject.id() + 't' + couchbaseObject.content()
    }
    catch {
      case exception: Throwable => {
        logger.error("Failed project couchbase object. key was: " + couchbaseObject.id(), exception)
      }
    }
    return null
  }
}

尝试调整maxRetriesmaxDelay配置,这将解决您的问题。

看这里:https://github.com/couchbase/couchbase-spark-connector/blob/33e4b8382ae80e39ae0254489f5c410f10e7d549/src/main/scala/com/couchbase/spark/connection/CouchbaseConfig.scala#L30

相关内容

最新更新