NoHostAvailableException(未尝试主机)使用Spark Cassandra Connector



我在使用 DataStax Spark Connector for Cassandra 时遇到了问题。我的应用程序包含一个 Spark 操作,该操作对 Cassandra 数据库执行许多单记录查询;其中许多查询将成功,但在某些时候,其中一个查询将失败,并显示消息All host(s) tried for query failed (no host was tried)NoHostAvailableException

堆栈跟踪

2018-06-26 12:32:09 ERROR Executor:91 - Exception in task 0.3 in stage 0.0 (TID 6)
com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s) tried for query failed (no host was tried)
at com.datastax.driver.core.exceptions.NoHostAvailableException.copy(NoHostAvailableException.java:84)
at com.datastax.driver.core.exceptions.NoHostAvailableException.copy(NoHostAvailableException.java:37)
at com.datastax.driver.core.DriverThrowables.propagateCause(DriverThrowables.java:37)
at com.datastax.driver.core.DefaultResultSetFuture.getUninterruptibly(DefaultResultSetFuture.java:245)
at com.datastax.driver.core.AbstractSession.execute(AbstractSession.java:68)
at sun.reflect.GeneratedMethodAccessor10.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at com.datastax.spark.connector.cql.SessionProxy.invoke(SessionProxy.scala:40)
at com.sun.proxy.$Proxy15.execute(Unknown Source)
at sun.reflect.GeneratedMethodAccessor10.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at com.datastax.spark.connector.cql.SessionProxy.invoke(SessionProxy.scala:40)
at com.sun.proxy.$Proxy16.execute(Unknown Source)
at [line that contains the session.execute() call]
[...]
Caused by: com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s) tried for query failed (no host was tried)
at com.datastax.driver.core.RequestHandler.reportNoMoreHosts(RequestHandler.java:211)
at com.datastax.driver.core.RequestHandler.access$1000(RequestHandler.java:46)
at com.datastax.driver.core.RequestHandler$SpeculativeExecution.findNextHostAndQuery(RequestHandler.java:275)
at com.datastax.driver.core.RequestHandler.startNewExecution(RequestHandler.java:115)
at com.datastax.driver.core.RequestHandler.sendRequest(RequestHandler.java:95)
at com.datastax.driver.core.SessionManager.executeAsync(SessionManager.java:132)
... 32 more

为了分析这个问题,我成功地在一个简单的环境中重现了它:

  • 运行 Cassandra、Spark 主控器和火花工作者的单台机器
  • 仅包含 100 条记录的简单表(10 个分区,每个分区 10 条记录(

以下是我可以重现该问题的最小代码。

法典

val pkColumn1Value = 1L
val pkColumn2Values: Dataset[Long] = sparkSession.createDataset(1L to 19 by 2)
val connector: CassandraConnector = [...]
val results: Dataset[SimpleValue] = pkColumn2Values.mapPartitions { iterator =>
connector.withSessionDo { session =>
val clusteringKeyValues = Seq(...)
val preparedStatement = session.prepare("select * from simple_values where pk_column_1_value = ? and pk_column_2_value = ? and clustering_key_value = ?")
iterator.flatMap { pkColumn2Value =>
val boundStatements = clusteringKeyValues.iterator.map(clusteringKeyValue =>
preparedStatement.bind(
pkColumn1Value.asInstanceOf[AnyRef]
, pkColumn2Value.asInstanceOf[AnyRef]
, clusteringKeyValue.asInstanceOf[AnyRef]
)
)
boundStatements.map { boundStatement =>
val record = try {
session.execute(boundStatement).one()
} catch {
case noHostAvailableException: NoHostAvailableException =>
log.error(s"Encountered NHAE, getErrors: ${noHostAvailableException.getErrors}")
throw noHostAvailableException
case exception =>
throw exception
}
log.error(s"Retrieved record $record")
// Sleep to simulate an operation being performed on the value.
Thread.sleep(100)
record
}
}
}
}
log.error(s"Perfunctory log statement that triggers an action: ${results.collect().last}")

我注意到的一些有趣的事情

  • 我正在使用Dataset#mapPartitions(),以便每个分区只能准备一次选择语句。当我吞下我的骄傲并使用Dataset#map()Dataset#flatMap()时,问题就消失了,但我想使用Dataset#mapPartitions()来实现每个数据集分区仅准备一次查询的(表面上(性能优势。
  • NoHostAvailableException似乎在执行第一个查询后固定发生一段时间。一些调查证实,这个时间量等于连接器属性spark.cassandra.connection.keep_alive_ms的值。将此属性设置为高得离谱的值表面上可以解决问题,但这似乎是一个肮脏的解决方法,而不是明智的解决方案。

在连接器的此 GitHub 问题中,评论者 pkolaczk 提到了一个潜在问题,该问题可能导致连接器在与 Cassandra 的初始连接中成功,并在以后尝试建立其他连接时失败。这听起来很有希望,因为它与上述几点相匹配(这表明只有在关闭原始连接后才会出现问题,如果单独为数据集中的每个元素重新建立连接,则永远不会发生(;但是,我无法找到任何迹象表明我错误配置了IP地址或导致这种现象的任何其他合理原因(甚至无法确认这种现象实际上首先导致了问题(。

我检查和/或尝试过的一些事情

  • 多个在线资源表明,NoHostAvailableException总是在前面出现其他错误。我已经多次检查了我的日志,但找不到任何其他错误消息或堆栈跟踪。
  • 另一个 StackOverflow 问题的答案建议调用NoHostAvailableException#getErrors以获取问题的更详细说明,但此方法总是为我返回一个空映射。
  • 当我使用 RDD 而不是数据集时,问题仍然存在(包括它仅在使用mapPartitions时发生而不是在使用map时发生的事实(。
  • 连接器属性spark.cassandra.connection.local_dc最初未设置。将此属性设置为适当的数据中心名称对问题没有明显影响。
  • 我尝试将连接器属性设置为spark.cassandra.connection.timeout_msspark.cassandra.read.timeout_ms到高得离谱的值;这对问题没有明显的影响。

某些版本号

  • Spark:在 2.1.1 和 2.3.0 中重现了该问题
  • 卡桑德拉:3.11
  • 连接器:在 2.0.3
  • 和 2.3.0 中都重现了该问题
  • 斯卡拉:2.11

任何导致这些错误的原因或如何解决问题的想法的指示将不胜感激。

我将这个问题交叉发布到连接器的Google用户组(https://groups.google.com/a/lists.datastax.com/d/msg/spark-connector-user/oWrP7qeHJ7k/pmgnF_kbBwAJ(,其中一位贡献者确认没有理由不具有高价值spark.cassandra.connection.keep_alive_ms。我已经将这个值提高到我可以合理地确定没有操作会通过它的程度,并且从那以后没有任何问题。

最新更新