我有一个运行在Docker上的齐柏林笔记本。我使用Cassandra编写了以下代码:
import org.apache.spark.sql.cassandra._
val cqlContext = new CassandraSQLContext(sc)
cqlContext.sql("select * from demo.table").collect.foreach(println)
然而,我得到这个错误:
import org.apache.spark.sql.cassandra._
cqlContext: org.apache.spark.sql.cassandra.CassandraSQLContext = org.apache.spark.sql.cassandra.CassandraSQLContext@395e28a8
com.google.common.util.concurrent.UncheckedExecutionException: java.lang.IllegalArgumentException: Cannot build a cluster without contact points
at com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2199)
at com.google.common.cache.LocalCache.get(LocalCache.java:3932)
at com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:3936)
at com.google.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4806)
at org.apache.spark.sql.cassandra.CassandraCatalog.lookupRelation(CassandraCatalog.scala:28)
at org.apache.spark.sql.cassandra.CassandraSQLContext$$anon$2.org$apache$spark$sql$catalyst$analysis$OverrideCatalog$$super$lookupRelation(CassandraSQLContext.scala:219)
at org.apache.spark.sql.catalyst.analysis.OverrideCatalog$$anonfun$lookupRelation$3.apply(Catalog.scala:137)
at org.apache.spark.sql.catalyst.analysis.OverrideCatalog$$anonfun$lookupRelation$3.apply(Catalog.scala:137)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.sql.catalyst.analysis.OverrideCatalog$class.lookupRelation(Catalog.scala:137)
at org.apache.spark.sql.cassandra.CassandraSQLContext$$anon$2.lookupRelation(CassandraSQLContext.scala:219)
at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$$anonfun$apply$5.applyOrElse(Analyzer.scala:143)
at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$$anonfun$apply$5.applyOrElse(Analyzer.scala:138)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:144)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:162)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
at scala.collection.AbstractIterator.to(Iterator.scala:1157)
at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformChildrenDown(TreeNode.scala:191)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:147)
at org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:135)
at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$.apply(Analyzer.scala:138)
at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$.apply(Analyzer.scala:137)
at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$apply$1$$anonfun$apply$2.apply(RuleExecutor.scala:61)
at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$apply$1$$anonfun$apply$2.apply(RuleExecutor.scala:59)
at scala.collection.LinearSeqOptimized$class.foldLeft(LinearSeqOptimized.scala:111)
at scala.collection.immutable.List.foldLeft(List.scala:84)
at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$apply$1.apply(RuleExecutor.scala:59)
at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$apply$1.apply(RuleExecutor.scala:51)
at scala.collection.immutable.List.foreach(List.scala:318)
at org.apache.spark.sql.catalyst.rules.RuleExecutor.apply(RuleExecutor.scala:51)
at org.apache.spark.sql.SQLContext$QueryExecution.analyzed$lzycompute(SQLContext.scala:411)
at org.apache.spark.sql.SQLContext$QueryExecution.analyzed(SQLContext.scala:411)
at org.apache.spark.sql.SQLContext$QueryExecution.withCachedData$lzycompute(SQLContext.scala:412)
at org.apache.spark.sql.SQLContext$QueryExecution.withCachedData(SQLContext.scala:412)
at org.apache.spark.sql.SQLContext$QueryExecution.optimizedPlan$lzycompute(SQLContext.scala:413)
at org.apache.spark.sql.SQLContext$QueryExecution.optimizedPlan(SQLContext.scala:413)
at org.apache.spark.sql.SQLContext$QueryExecution.sparkPlan$lzycompute(SQLContext.scala:418)
at org.apache.spark.sql.SQLContext$QueryExecution.sparkPlan(SQLContext.scala:416)
at org.apache.spark.sql.SQLContext$QueryExecution.executedPlan$lzycompute(SQLContext.scala:422)
at org.apache.spark.sql.SQLContext$QueryExecution.executedPlan(SQLContext.scala:422)
at org.apache.spark.sql.SchemaRDD.collect(SchemaRDD.scala:444)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:32)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:37)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:39)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:41)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:43)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:45)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:47)
at $iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:49)
at $iwC$$iwC$$iwC$$iwC.<init>(<console>:51)
at $iwC$$iwC$$iwC.<init>(<console>:53)
at $iwC$$iwC.<init>(<console>:55)
at $iwC.<init>(<console>:57)
at <init>(<console>:59)
at .<init>(<console>:63)
at .<clinit>(<console>)
at .<init>(<console>:7)
at .<clinit>(<console>)
at $print(<console>)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:852)
at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1125)
at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:674)
at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:705)
at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:669)
at com.nflabs.zeppelin.spark.SparkInterpreter.interpretInput(SparkInterpreter.java:541)
at com.nflabs.zeppelin.spark.SparkInterpreter.interpret(SparkInterpreter.java:517)
at com.nflabs.zeppelin.spark.SparkInterpreter.interpret(SparkInterpreter.java:510)
at com.nflabs.zeppelin.interpreter.ClassloaderInterpreter.interpret(ClassloaderInterpreter.java:40)
at com.nflabs.zeppelin.interpreter.LazyOpenInterpreter.interpret(LazyOpenInterpreter.java:76)
at com.nflabs.zeppelin.interpreter.remote.RemoteInterpreterServer$InterpretJob.jobRun(RemoteInterpreterServer.java:246)
at com.nflabs.zeppelin.scheduler.Job.run(Job.java:152)
at com.nflabs.zeppelin.scheduler.FIFOScheduler$1.run(FIFOScheduler.java:101)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
at java.util.concurrent.FutureTask.run(FutureTask.java:262)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:178)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:292)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.IllegalArgumentException: Cannot build a cluster without contact points
at com.datastax.driver.core.Cluster.checkNotEmpty(Cluster.java:116)
at com.datastax.driver.core.Cluster.<init>(Cluster.java:108)
at com.datastax.driver.core.Cluster.buildFrom(Cluster.java:177)
at com.datastax.driver.core.Cluster$Builder.build(Cluster.java:1109)
at com.datastax.spark.connector.cql.DefaultConnectionFactory$.createCluster(CassandraConnectionFactory.scala:78)
at com.datastax.spark.connector.cql.CassandraConnector$.com$datastax$spark$connector$cql$CassandraConnector$$createSession(CassandraConnector.scala:167)
at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$2.apply(CassandraConnector.scala:162)
at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$2.apply(CassandraConnector.scala:162)
at com.datastax.spark.connector.cql.RefCountedCache.createNewValueAndKeys(RefCountedCache.scala:31)
at com.datastax.spark.connector.cql.RefCountedCache.acquire(RefCountedCache.scala:56)
at com.datastax.spark.connector.cql.CassandraConnector.openSession(CassandraConnector.scala:73)
at com.datastax.spark.connector.cql.CassandraConnector.withSessionDo(CassandraConnector.scala:99)
at com.datastax.spark.connector.cql.CassandraConnector.withClusterDo(CassandraConnector.scala:110)
at com.datastax.spark.connector.cql.Schema$.fromCassandra(Schema.scala:173)
at org.apache.spark.sql.cassandra.CassandraCatalog$$anon$1.load(CassandraCatalog.scala:22)
at org.apache.spark.sql.cassandra.CassandraCatalog$$anon$1.load(CassandraCatalog.scala:19)
at com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3522)
at com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2315)
at com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2278)
at com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2193)
... 92 more
从Docker命令行我运行docker pull cassandra
,但问题仍然存在。
我该怎么做才能使用Cassandra?
要连接到cassandra集群,必须在spark conf中提供cassandra集群的一个节点,如下所示:
conf.set("spark.cassandra.connection.host", "127.0.0.1")
我有同样的问题Cannot build a cluster without contact points
,并设法解决它通过设置SparkConf()
如下:
conf = SparkConf()
.setAppName("MyApp")
.setMaster("spark://127.0.0.1:7077")
.set("spark.cassandra.connection.host", "127.0.0.1")
一个基本的Spark
from pyspark import SparkConf, SparkContext
from pyspark.sql import SQLContext
conf = SparkConf()
.setAppName("PySpark Cassandra Test")
.setMaster("spark://127.0.0.1:7077")
.set("spark.cassandra.connection.host", "127.0.0.1")
sc = SparkContext('local', conf=conf)
sql = SQLContext(sc)
test = sql.read.format("org.apache.spark.sql.cassandra").
load(keyspace="mykeyspace", table="mytable")
test.collect()