ahoi全能开发。我正在Spark中运行一些基本分析,其中我正在查询多节点Cassandra。我正在运行的代码以及正在处理某些untrogic的地方是:
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.SQLContext
import org.apache.spark.SparkContext
import com.datastax.spark.connector._
object cassSpark {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
.set("spark.cassandra.connection.host","192.168.56.101")
.set("spark.cassandra.connection.host","192.168.56.102")
.set("spark.cassandra.connection.local_dc", "datacenter1")
.setMaster("local[*]")
.setAppName("cassSpark")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
time{
val df = sqlContext
.read
.format("org.apache.spark.sql.cassandra")
.options(Map( "table" -> "nth1", "keyspace"->"nth", "cluster" -> "Test Cluster"))
.load().cache()
df.count()
}
def time[A](f: => A) = {
val s = System.nanoTime
val ret = f
println("time: " + (System.nanoTime - s) / 1e6 + "ms")
ret
}
}
}
因此,Spark的版本为1.6.0,Cassandra v3.0.10,连接器也为1.6.0。Keyspace具有replication_factor: 1
,表有5列,其中只有1行。如您所见,那里有2个节点(在OracleVM中制作的虚拟猕猴)。
我的问题是,当我测量从Spark到Cassandra的查询时间时,我得到的结果约为20秒,这对我来说并不正常,桌子中只有一行。我是否缺少某些内容,或者我的测量问题是错误的,或者可能在矿山代码中的某些内容。有人可以帮助我,或者向我展示如何有效地做或处理它。
[编辑]
正如@Artem aliev想要的那样,这是全部信息日志:
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
16/12/06 12:31:04 INFO SparkContext: Running Spark version 1.6.0
16/12/06 12:31:06 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
16/12/06 12:31:07 INFO SecurityManager: Changing view acls to: superbrainbug
16/12/06 12:31:07 INFO SecurityManager: Changing modify acls to: Ivan Majnaric
16/12/06 12:31:07 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(superbrainbug); users with modify permissions: Set(superbrainbug)
16/12/06 12:31:12 INFO Utils: Successfully started service 'sparkDriver' on port 62101.
16/12/06 12:31:14 INFO Slf4jLogger: Slf4jLogger started
16/12/06 12:31:14 INFO Remoting: Starting remoting
16/12/06 12:31:15 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriverActorSystem@192.168.56.1:62114]
16/12/06 12:31:15 INFO Utils: Successfully started service 'sparkDriverActorSystem' on port 62114.
16/12/06 12:31:15 INFO SparkEnv: Registering MapOutputTracker
16/12/06 12:31:15 INFO SparkEnv: Registering BlockManagerMaster
16/12/06 12:31:15 INFO DiskBlockManager: Created local directory at C:UserssuperbrainbugAppDataLocalTempblockmgr-8b664e71-ead1-4462-b171-bf542a5eb444
16/12/06 12:31:15 INFO MemoryStore: MemoryStore started with capacity 1124.6 MB
16/12/06 12:31:18 INFO SparkEnv: Registering OutputCommitCoordinator
16/12/06 12:31:20 INFO Utils: Successfully started service 'SparkUI' on port 4040.
16/12/06 12:31:20 INFO SparkUI: Started SparkUI at http://192.168.56.1:4040
16/12/06 12:31:20 INFO Executor: Starting executor ID driver on host localhost
16/12/06 12:31:20 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 62136.
16/12/06 12:31:20 INFO NettyBlockTransferService: Server created on 62136
16/12/06 12:31:20 INFO BlockManagerMaster: Trying to register BlockManager
16/12/06 12:31:21 INFO BlockManagerMasterEndpoint: Registering block manager localhost:62136 with 1124.6 MB RAM, BlockManagerId(driver, localhost, 62136)
16/12/06 12:31:21 INFO BlockManagerMaster: Registered BlockManager
16/12/06 12:31:23 WARN NettyUtil: Found Netty's native epoll transport, but not running on linux-based operating system. Using NIO instead.
16/12/06 12:31:24 INFO Cluster: New Cassandra host /192.168.56.101:9042 added
16/12/06 12:31:24 INFO CassandraConnector: Connected to Cassandra cluster: Test Cluster
16/12/06 12:31:27 INFO CassandraSourceRelation: Input Predicates: []
16/12/06 12:31:27 INFO CassandraSourceRelation: Input Predicates: []
16/12/06 12:31:31 INFO SparkContext: Starting job: count at cassSpark.scala:69
16/12/06 12:31:31 INFO DAGScheduler: Registering RDD 7 (count at cassSpark.scala:69)
16/12/06 12:31:31 INFO DAGScheduler: Got job 0 (count at cassSpark.scala:69) with 1 output partitions
16/12/06 12:31:31 INFO DAGScheduler: Final stage: ResultStage 1 (count at cassSpark.scala:69)
16/12/06 12:31:31 INFO DAGScheduler: Parents of final stage: List(ShuffleMapStage 0)
16/12/06 12:31:31 INFO DAGScheduler: Missing parents: List(ShuffleMapStage 0)
16/12/06 12:31:31 INFO DAGScheduler: Submitting ShuffleMapStage 0 (MapPartitionsRDD[7] at count at cassSpark.scala:69), which has no missing parents
16/12/06 12:31:35 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 20.0 KB, free 20.0 KB)
16/12/06 12:31:35 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 9.2 KB, free 29.2 KB)
16/12/06 12:31:35 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on localhost:62136 (size: 9.2 KB, free: 1124.6 MB)
16/12/06 12:31:35 INFO SparkContext: Created broadcast 0 from broadcast at DAGScheduler.scala:1006
16/12/06 12:31:35 INFO DAGScheduler: Submitting 1 missing tasks from ShuffleMapStage 0 (MapPartitionsRDD[7] at count at cassSpark.scala:69)
16/12/06 12:31:35 INFO TaskSchedulerImpl: Adding task set 0.0 with 1 tasks
16/12/06 12:31:35 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, localhost, partition 0,ANY, 20008 bytes)
16/12/06 12:31:35 INFO Executor: Running task 0.0 in stage 0.0 (TID 0)
16/12/06 12:31:35 INFO CacheManager: Partition rdd_4_0 not found, computing it
16/12/06 12:31:36 INFO GenerateUnsafeProjection: Code generated in 395.192925 ms
16/12/06 12:31:39 INFO MemoryStore: Block rdd_4_0 stored as values in memory (estimated size 1168.0 B, free 30.3 KB)
16/12/06 12:31:39 INFO BlockManagerInfo: Added rdd_4_0 in memory on localhost:62136 (size: 1168.0 B, free: 1124.6 MB)
16/12/06 12:31:39 INFO GeneratePredicate: Code generated in 35.193967 ms
16/12/06 12:31:39 INFO GenerateColumnAccessor: Code generated in 52.555004 ms
16/12/06 12:31:39 INFO GenerateMutableProjection: Code generated in 14.129896 ms
16/12/06 12:31:39 INFO GenerateUnsafeProjection: Code generated in 14.130749 ms
16/12/06 12:31:40 INFO GenerateMutableProjection: Code generated in 19.217034 ms
16/12/06 12:31:40 INFO GenerateUnsafeRowJoiner: Code generated in 72.736302 ms
16/12/06 12:31:40 INFO GenerateUnsafeProjection: Code generated in 133.407346 ms
16/12/06 12:31:41 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 3895 bytes result sent to driver
16/12/06 12:31:41 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 5452 ms on localhost (1/1)
16/12/06 12:31:41 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool
16/12/06 12:31:41 INFO DAGScheduler: ShuffleMapStage 0 (count at cassSpark.scala:69) finished in 5,542 s
16/12/06 12:31:41 INFO DAGScheduler: looking for newly runnable stages
16/12/06 12:31:41 INFO DAGScheduler: running: Set()
16/12/06 12:31:41 INFO DAGScheduler: waiting: Set(ResultStage 1)
16/12/06 12:31:41 INFO DAGScheduler: failed: Set()
16/12/06 12:31:41 INFO DAGScheduler: Submitting ResultStage 1 (MapPartitionsRDD[10] at count at cassSpark.scala:69), which has no missing parents
16/12/06 12:31:41 INFO MemoryStore: Block broadcast_1 stored as values in memory (estimated size 9.3 KB, free 39.6 KB)
16/12/06 12:31:41 INFO MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 4.6 KB, free 44.1 KB)
16/12/06 12:31:41 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on localhost:62136 (size: 4.6 KB, free: 1124.6 MB)
16/12/06 12:31:41 INFO SparkContext: Created broadcast 1 from broadcast at DAGScheduler.scala:1006
16/12/06 12:31:41 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 1 (MapPartitionsRDD[10] at count at cassSpark.scala:69)
16/12/06 12:31:41 INFO TaskSchedulerImpl: Adding task set 1.0 with 1 tasks
16/12/06 12:31:41 INFO TaskSetManager: Starting task 0.0 in stage 1.0 (TID 1, localhost, partition 0,NODE_LOCAL, 1999 bytes)
16/12/06 12:31:41 INFO Executor: Running task 0.0 in stage 1.0 (TID 1)
16/12/06 12:31:41 INFO ShuffleBlockFetcherIterator: Getting 1 non-empty blocks out of 1 blocks
16/12/06 12:31:41 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 15 ms
16/12/06 12:31:41 INFO GenerateMutableProjection: Code generated in 92.969655 ms
16/12/06 12:31:41 INFO GenerateMutableProjection: Code generated in 11.48414 ms
16/12/06 12:31:41 INFO Executor: Finished task 0.0 in stage 1.0 (TID 1). 1830 bytes result sent to driver
16/12/06 12:31:41 INFO TaskSetManager: Finished task 0.0 in stage 1.0 (TID 1) in 602 ms on localhost (1/1)
16/12/06 12:31:41 INFO TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have all completed, from pool
16/12/06 12:31:41 INFO DAGScheduler: ResultStage 1 (count at cassSpark.scala:69) finished in 0,605 s
16/12/06 12:31:41 INFO DAGScheduler: Job 0 finished: count at cassSpark.scala:69, took 10,592173 s
time: 19242.858679ms
16/12/06 12:31:48 INFO CassandraConnector: Disconnected from Cassandra cluster: Test Cluster
16/12/06 12:31:48 INFO SparkContext: Invoking stop() from shutdown hook
16/12/06 12:31:48 INFO SerialShutdownHooks: Successfully executed shutdown hook: Clearing session cache for C* connector
16/12/06 12:31:48 INFO SparkUI: Stopped Spark web UI at http://192.168.56.1:4040
16/12/06 12:31:48 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
16/12/06 12:31:48 INFO MemoryStore: MemoryStore cleared
16/12/06 12:31:48 INFO BlockManager: BlockManager stopped
16/12/06 12:31:48 INFO BlockManagerMaster: BlockManagerMaster stopped
16/12/06 12:31:48 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
16/12/06 12:31:48 INFO SparkContext: Successfully stopped SparkContext
16/12/06 12:31:48 INFO ShutdownHookManager: Shutdown hook called
16/12/06 12:31:48 INFO ShutdownHookManager: Deleting directory C:UserssuperbrainbugAppDataLocalTempspark-17606f05-6bb8-4144-acb5-015f15ea1ea9
Process finished with exit code 0
另外,对于@yves Darmaillac,如果我删除.cache()
运行时是 12S
20秒为您的环境看起来比预期大一点,应该大约2秒您可以在此处启用信息级别的spark记录和发布结果
您示例中的主要消费者是初始化,第二次运行将更快
-
火花是懒惰的框架,因此在第一个查询中进行了很多初始化。您使用"本地"大师,因此当您在Spark群集上运行时会较慢。
-
连接器已针对批量查询进行了优化,因此在其中进行了很多准备,以更快地运行散装查询:查询C* cluster size size and doken ranges位置,创建split,建立C* Connection,。
为什么您要坚持load()
流?这不是必需的,因为您不重复使用它。时机是否仍然是同一持续的缓存?
尝试以下操作:
time{
val df = sqlContext
.read
.format("org.apache.spark.sql.cassandra")
.options(Map( "table" -> "nth1", "keyspace"->"nth", "cluster" -> Test Cluster"))
.load()
df.count()
}