我正在尝试运行我的第一个spark作业(一个访问Cassandra的Scala作业),该作业失败并显示以下错误:
java.io.IOException: Failed to open native connection to Cassandra at {<ip>}:9042
at com.datastax.spark.connector.cql.CassandraConnector$.com$datastax$spark$connector$cql$CassandraConnector$$createSession(CassandraConnector.scala:164)
at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$2.apply(CassandraConnector.scala:150)
at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$2.apply(CassandraConnector.scala:150)
at com.datastax.spark.connector.cql.RefCountedCache.createNewValueAndKeys(RefCountedCache.scala:31)
...........
............
Caused by: java.lang.IllegalArgumentException: Contact points contain multiple data centers:
at com.datastax.spark.connector.cql.LocalNodeFirstLoadBalancingPolicy.init(LocalNodeFirstLoadBalancingPolicy.scala:47)
at com.datastax.driver.core.Cluster$Manager.init(Cluster.java:1099)
at com.datastax.driver.core.Cluster.getMetadata(Cluster.java:271)
at com.datastax.spark.connector.cql.CassandraConnector$.com$datastax$spark$connector$cql$CassandraConnector$$createSession(CassandraConnector.scala:157)
我们在这里做错了什么?
我正在使用:
- Spark 1.5.2
- Apache Cassandra 2.1.10
- spark cassandra连接器1.3.1/1.5.0-M2(尝试了两个连接器)
- Scala版本2.10.4
-->据作者称,目前正在进行修复工作。请参阅此答案下方的评论。
我在文档中找到了这个,我希望它能帮助你:
override def init(cluster: Cluster, hosts: JCollection[Host]) {
nodes = hosts.toSet
// use explicitly set DC if available, otherwise see if all contact points have same DC
// if so, use that DC; if not, throw an error
dcToUse = localDC match {
case Some(local) => local
case None =>
val dcList = dcs(nodesInTheSameDC(contactPoints, hosts.toSet))
if (dcList.size == 1)
dcList.head
else
throw new IllegalArgumentException(s"Contact points contain multiple data centers: ${dcList.mkString(", ")}")
}
clusterMetadata = cluster.getMetadata
}
我在尝试使用Apache Spark 2.x.x连接两个Cassandra数据中心时遇到了同样的问题。
public class SparkCassandraTest {
private static final String CASSANDRA_ENDPOINTS = "DC1_node1,DC1_node2,DC1_node3,DC2_node1,DC2_node2,DC2_node3";
public static void main(String[] args) {
sparkConf = new SparkConf().setAppName(APP_NAME);
sparkConf.set("spark.cassandra.connection.host", CASSANDRA_ENDPOINTS);
sparkConf.set("spark.cassandra.auth.username", CASSANDRA_USERNAME);
sparkConf.set("spark.cassandra.auth.password", CASSANDRA_PASSWORD);
sparkSession = SparkSession.builder().config(sparkConf).enableHiveSupport().getOrCreate();
//.....................
//.....................
//.....................
}
}
引起原因:java.lang.IollegalArgumentException:需求失败:联系点包含多个数据中心:DC2-XXXX2,DC1-XXXX1
我通过连接任意一个Cassandra数据中心(DC1_node1、DC2_node2和DC_node3)或(DC2_node1、DC2_node2C1_node3)来解决此问题。