Cassandra Cluster无法通过Spark看到节点



我正在尝试通过Spark进行写入。我的集群中有6个节点,我在其中设置了键空间,我想在其中写入数据:

CREATE KEYSPACE traffic WITH replication = {'class': 'SimpleStrategy',    'replication_factor': '2'}  AND durable_writes = true;

当我试图从Spark写,我得到这样的错误:

16/08/17 16:14:57 ERROR QueryExecutor: Failed to execute:  com.datastax.spark.connector.writer.RichBatchStatement@7409fd2d
com.datastax.driver.core.exceptions.UnavailableException: Not enough replicas available for query at consistency ONE (1 required but only 0 alive)

这是我正在做的代码片段:

import org.apache.spark.sql.SQLContext
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark
import org.apache.spark.storage.StorageLevel
import org.apache.spark.sql.types.{StructType, StructField, DateType,  IntegerType};


object ff {
def main(string: Array[String]) {
val conf = new SparkConf()
  .set("spark.cassandra.connection.host", "127.0.0.1")
  .set("spark.cassandra.connection.host","ONE")
  .setMaster("local[4]")
  .setAppName("ff")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
val df = sqlContext.read
  .format("com.databricks.spark.csv")
  .option("header", "true") // Use first line of all files as header
  .option("inferSchema", "true")
  .load("test.csv")
df.registerTempTable("ff_table")
//df.printSchema()
df.count
time {
  df.write
    .format("org.apache.spark.sql.cassandra")
    .options(Map("table" -> "ff_table", "keyspace" -> "traffic"))
    .save()
}
def time[A](f: => A) = {
  val s = System.nanoTime
  val ret = f
  println("time: " + (System.nanoTime - s) / 1e6 + "ms")
  ret
}

 }
}

同样,如果我运行nodetool describecluster,我得到了这个结果:

Cluster Information:
Name: Test Cluster
Snitch: org.apache.cassandra.locator.DynamicEndpointSnitch
Partitioner: org.apache.cassandra.dht.Murmur3Partitioner
Schema versions:
    bf6c3ae7-5c8b-3e5d-9794-8e34bee9278f: [127.0.0.1, 127.0.0.2, 127.0.0.3, 127.0.0.4, 127.0.0.5, 127.0.0.6]

我试图在CLI中插入replication_factor:2的行,它正在工作,所以每个节点都可以看到对方。为什么Spark不能插入任何东西,为什么节点在试图从Spark插入数据时看不到彼此,有人知道吗?

看起来您通过环回在一台机器上运行6个节点。这意味着这台机器的资源很有可能被超额订阅。各种Cassandra实例很可能轮流或交换,这导致它们在负载过重时丢失。增加复制因子会增加有效目标启动的几率,但会进一步增加负载。

C*的核心需要来自系统的几个不同的资源,如果其中任何一个成为瓶颈,其中任何一个都有可能导致节点在足够的时间内无法响应八卦。

这些资源是RAM——JVM能够获得多少内存,这也受到操作系统交换的影响。这意味着如果您分配了一个很大的JVM,但是操作系统将其交换到磁盘上,那么您可能会看到大量的性能问题。对于同一台机器上的多个节点,您需要确保为正在启动的每个节点的JVM提供足够的内存。此外,如果任何一个实例的JVM快满了,你就会进入GC,可能会出现GC风暴,基本上会锁定该实例。其中许多细节将在system.log中清楚显示。

CPU -如果没有对至少一个CPU的独占访问,你几乎可以保证在C*中有一些重要的线程被调度,它们之间有很长的延迟。这可能导致八卦线程被忽略,八卦失败。这将给一些节点一个有故障机器的集群视图,并导致不可用的错误。

DISK -每个Cassandra实例将维护它自己的CommitLog和HD文件。提交日志每10秒刷新一次,如果您有多个实例并且只有一个硬盘驱动器,那么提交日志和普通memtable之间的刷新很容易互相阻塞。这进一步复杂化了压缩,这需要另外大量的IO。

NETWORK -虽然这不是同一台机器上的多个节点的问题。

在求和,重要的是要确保分配给C*实例的资源足够小,以至于没有实例会占用另一个实例的空间/ram/cpu。如果这样做,将导致集群在负载下通信失败,因为上述资源之一出现了瓶颈。这并不意味着不可能在同一台机器上运行多个节点,而是意味着在配置时必须小心。您还可以尝试通过限制写速度来减少负载,这将减少节点之间相互攻击的机会。

相关内容

  • 没有找到相关文章

最新更新