我有一个maven scala应用程序,它向Spark独立单节点集群提交了一个Spark作业。提交作业后,Spark 应用程序会尝试使用 spark-cassandra-connector 访问托管在 Amazon EC2 实例上的 cassandra。已建立连接,但不返回结果。一段时间后,连接器断开连接。如果我在本地模式下运行 Spark,它可以正常工作。我尝试创建简单的应用程序,我的代码如下所示:
val sc = SparkContextLoader.getSC
def runSparkJob():Unit={
val table =sc.cassandraTable("prosolo_logs_zj", "logevents")
println(table.collect().mkString("n"))
}
SparkContext.scala
object SparkContextLoader {
val sparkConf = new SparkConf()
sparkConf.setMaster("spark://127.0.1.1:7077")
sparkConf.set("spark.cores_max","2")
sparkConf.set("spark.executor.memory","2g")
sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
sparkConf.setAppName("Test application")
sparkConf.set("spark.cassandra.connection.host", "xxx.xxx.xxx.xxx")
sparkConf.set("spark.cassandra.connection.port", "9042")
sparkConf.set("spark.ui.port","4041")
val oneJar="/samplesparkmaven/target/samplesparkmaven-jar.jar"
sparkConf.setJars(List(oneJar))
@transient val sc = new SparkContext(sparkConf)
}
控制台输出如下所示:
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
17/02/14 23:11:25 INFO SparkContext: Running Spark version 2.1.0
17/02/14 23:11:26 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
17/02/14 23:11:27 WARN Utils: Your hostname, zoran-Latitude-E5420 resolves to a loopback address: 127.0.1.1; using 192.168.2.68 instead (on interface wlp2s0)
17/02/14 23:11:27 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
17/02/14 23:11:27 INFO SecurityManager: Changing view acls to: zoran
17/02/14 23:11:27 INFO SecurityManager: Changing modify acls to: zoran
17/02/14 23:11:27 INFO SecurityManager: Changing view acls groups to:
17/02/14 23:11:27 INFO SecurityManager: Changing modify acls groups to:
17/02/14 23:11:27 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(zoran); groups with view permissions: Set(); users with modify permissions: Set(zoran); groups with modify permissions: Set()
17/02/14 23:11:28 INFO Utils: Successfully started service 'sparkDriver' on port 33995.
17/02/14 23:11:28 INFO SparkEnv: Registering MapOutputTracker
17/02/14 23:11:28 INFO SparkEnv: Registering BlockManagerMaster
17/02/14 23:11:28 INFO BlockManagerMasterEndpoint: Using org.apache.spark.storage.DefaultTopologyMapper for getting topology information
17/02/14 23:11:28 INFO BlockManagerMasterEndpoint: BlockManagerMasterEndpoint up
17/02/14 23:11:28 INFO DiskBlockManager: Created local directory at /tmp/blockmgr-7b25a4cc-cb37-4332-a59b-e36fa45511cd
17/02/14 23:11:28 INFO MemoryStore: MemoryStore started with capacity 870.9 MB
17/02/14 23:11:28 INFO SparkEnv: Registering OutputCommitCoordinator
17/02/14 23:11:28 INFO Utils: Successfully started service 'SparkUI' on port 4041.
17/02/14 23:11:28 INFO SparkUI: Bound SparkUI to 0.0.0.0, and started at http://192.168.2.68:4041
17/02/14 23:11:28 INFO SparkContext: Added JAR /samplesparkmaven/target/samplesparkmaven-jar.jar at spark://192.168.2.68:33995/jars/samplesparkmaven-jar.jar with timestamp 1487142688817
17/02/14 23:11:28 INFO StandaloneAppClient$ClientEndpoint: Connecting to master spark://127.0.1.1:7077...
17/02/14 23:11:28 INFO TransportClientFactory: Successfully created connection to /127.0.1.1:7077 after 62 ms (0 ms spent in bootstraps)
17/02/14 23:11:29 INFO StandaloneSchedulerBackend: Connected to Spark cluster with app ID app-20170214231129-0016
17/02/14 23:11:29 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 36901.
17/02/14 23:11:29 INFO NettyBlockTransferService: Server created on 192.168.2.68:36901
17/02/14 23:11:29 INFO BlockManager: Using org.apache.spark.storage.RandomBlockReplicationPolicy for block replication policy
17/02/14 23:11:29 INFO BlockManagerMaster: Registering BlockManager BlockManagerId(driver, 192.168.2.68, 36901, None)
17/02/14 23:11:29 INFO BlockManagerMasterEndpoint: Registering block manager 192.168.2.68:36901 with 870.9 MB RAM, BlockManagerId(driver, 192.168.2.68, 36901, None)
17/02/14 23:11:29 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(driver, 192.168.2.68, 36901, None)
17/02/14 23:11:29 INFO BlockManager: Initialized BlockManager: BlockManagerId(driver, 192.168.2.68, 36901, None)
17/02/14 23:11:29 INFO StandaloneSchedulerBackend: SchedulerBackend is ready for scheduling beginning after reached minRegisteredResourcesRatio: 0.0
17/02/14 23:11:29 INFO NettyUtil: Found Netty's native epoll transport in the classpath, using it
17/02/14 23:11:31 INFO Cluster: New Cassandra host /xxx.xxx.xxx.xxx:9042 added
17/02/14 23:11:31 INFO CassandraConnector: Connected to Cassandra cluster: Test Cluster
17/02/14 23:11:32 INFO SparkContext: Starting job: collect at SparkConnector.scala:28
17/02/14 23:11:32 INFO DAGScheduler: Got job 0 (collect at SparkConnector.scala:28) with 6 output partitions
17/02/14 23:11:32 INFO DAGScheduler: Final stage: ResultStage 0 (collect at SparkConnector.scala:28)
17/02/14 23:11:32 INFO DAGScheduler: Parents of final stage: List()
17/02/14 23:11:32 INFO DAGScheduler: Missing parents: List()
17/02/14 23:11:32 INFO DAGScheduler: Submitting ResultStage 0 (CassandraTableScanRDD[0] at RDD at CassandraRDD.scala:18), which has no missing parents
17/02/14 23:11:32 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 8.4 KB, free 870.9 MB)
17/02/14 23:11:32 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 4.4 KB, free 870.9 MB)
17/02/14 23:11:32 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on 192.168.2.68:36901 (size: 4.4 KB, free: 870.9 MB)
17/02/14 23:11:32 INFO SparkContext: Created broadcast 0 from broadcast at DAGScheduler.scala:996
17/02/14 23:11:32 INFO DAGScheduler: Submitting 6 missing tasks from ResultStage 0 (CassandraTableScanRDD[0] at RDD at CassandraRDD.scala:18)
17/02/14 23:11:32 INFO TaskSchedulerImpl: Adding task set 0.0 with 6 tasks
17/02/14 23:11:39 INFO CassandraConnector: Disconnected from Cassandra cluster: Test Cluster
我正在使用
- 斯卡拉 2.11.6
- Spark 2.1.0(适用于独立 Spark 和应用程序中的依赖关系(
- 火花卡桑德拉连接器 2.0.0-M3
- Cassandra Java 驱动程序 3.0.0
- Apache Cassandra 3.9
Cassandra 连接器的版本兼容性表没有显示任何问题,但我无法找出任何其他可能是问题的地方。
我终于解决了我遇到的问题。原来是路径的问题。我正在使用罐子的本地路径,但错过了在开头添加".",因此它被视为绝对路径。不幸的是,应用程序中没有异常表明提供的路径上不存在该文件,我唯一的例外是来自在 Spark 集群中找不到 jar 文件的工作线程。