我正在尝试将两个表格从卡桑德拉查询两个数据范围,然后将这两个dataframes加入一个dataframe(结果)。
我可以得到正确的结果,并且火花作业可以在我的计算机中正常完成。但是,当我提交给Spark Server(本地模式)时,该作业只会挂起而没有任何例外或错误消息,一个小时后才能完成,直到我按Ctrl C停止它。
我不知道为什么该作业无法在Spark Server上工作,Eclipse和Spark Server之间有什么区别。如果原因是一个有可能的问题,是否可能会出现任何例外,只是挂了吗?
有建议吗?谢谢〜
提交命令
/usr/bin/spark-submit --class com.test.c2c --jars file:///home/iotcloud/Documents/grace/spark/spark-cassandra-connector-1.6.3-s_2.10.jar file:///home/iotcloud/Documents/grace/spark/C2C_1205.jar
这是我的Scala代码:
package com.test
import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import com.datastax.spark.connector.cql._;
import com.datastax.spark.connector._;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql._;
import org.apache.spark.sql.cassandra._;
object c2c {
def main(args: Array[String]) {
println("Start...")
val conf = new SparkConf(true)
.set("spark.cassandra.connection.host", "10.2.1.67")
.setAppName("ConnectToCassandra")
.setMaster("local")
val sc = new SparkContext(conf)
println("Cassandra setting done...")
println("================================================1")
println("Start to save to cassandra...")
val cc = new CassandraSQLContext(sc)
cc.setKeyspace("iot_test")
val df_info = cc.sql("select gatewaymac,sensormac,sensorid,sensorfrequency,status from tsensor_info where gatewaymac != 'failed'")
val df_loc = cc.sql("select sensorlocationid,sensorlocationname,company,plant,department,building,floor,sensorid from tsensorlocation_info where sensorid != 'NULL'")
println("================================================2")
println("registerTmepTable...")
df_info.registerTempTable("info")
df_loc.registerTempTable("loc")
println("================================================4")
println("mapping table...")
println("===info===")
df_info.printSchema()
df_info.take(5).foreach(println)
println("===location===")
df_loc.printSchema()
df_loc.take(5).foreach(println)
println("================================================5")
println("print mapping result")
val result = df_info.join(df_loc, "sensorid")
result.registerTempTable("ref")
result.printSchema()
result.take(5).foreach(println)
println("====Finish====")
sc.stop()
}
}
日食的正常结果
Cassandra setting done...
================================================1
Start to save to cassandra...
================================================2
registerTmepTable...
================================================4
mapping table...
===info===
root
|-- gatewaymac: string (nullable = true)
|-- sensormac: string (nullable = true)
|-- sensorid: string (nullable = true)
|-- sensorfrequency: string (nullable = true)
|-- status: string (nullable = true)
[0000aaaaaaaaat7f,e9d050f0ebc25000 ,0000aaaaaaaaat7f3242,null,N]
[000000000000b219,c879b4f921c25000 ,000000000000b2193590,00:01,N]
[0000aaaaaaaaaabb,2c153cf9f0c25000 ,0000aaaaaaaaaabba353,null,Y]
[000000000000a412,17da712795c25000 ,000000000000a4126156,00:05,Y]
[000000000000a104,b2a4b8b7a6c25000 ,000000000000a1046340,00:01,N]
===location===
root
|-- sensorlocationid: string (nullable = true)
|-- sensorlocationname: string (nullable = true)
|-- company: string (nullable = true)
|-- plant: string (nullable = true)
|-- department: string (nullable = true)
|-- building: string (nullable = true)
|-- floor: string (nullable = true)
|-- sensorid: string (nullable = true)
[JA092,A1F-G-L00-S066,IAC,IACJ,MT,A,1,000000000000a108a19f]
[JA044,A2F-I-L00-S037,IAC,IACJ,MT,A,2,000000000000a2024246]
[JA111,A2F-C-L00-S076,IAC,IACJ,MPA,A,2,000000000000a210c710]
[PA041,A1F-SMT-S03,IAC,IACP,SMT,A,1,000000000000a10354c1]
[PC010,C3F-IQC-S03,IAC,IACP,IQC,C,3,000000000000c3269786]
================================================5
print mapping result
root
|-- sensorid: string (nullable = true)
|-- gatewaymac: string (nullable = true)
|-- sensormac: string (nullable = true)
|-- sensorfrequency: string (nullable = true)
|-- status: string (nullable = true)
|-- sensorlocationid: string (nullable = true)
|-- sensorlocationname: string (nullable = true)
|-- company: string (nullable = true)
|-- plant: string (nullable = true)
|-- department: string (nullable = true)
|-- building: string (nullable = true)
|-- floor: string (nullable = true)
[000000000000a10275bc,000000000000a102,e85ce9b9d2c25000 ,00:05,Y,PA030,A1F-WM-S02,IAC,IACP,WM,A,1]
[000000000000b117160c,000000000000b117,33915a79e5c25000 ,00:05,Y,PB011,B1F-WM-S01,IAC,IACP,WM,B,1]
[000000000000a309024b,000000000000a309,afdab2efbbc25000 ,00:00,N,PA101,A3F-MP6-R01,IAC,IACP,MP6,A,3]
[000000000000c6294109,000000000000c629,383cca8e45c25000 ,00:05,Y,PC017,C6F-WM-S01,IAC,IACP,WM,C,6]
[000000000000a205e52e,000000000000a205,8d83303cf4c25000 ,00:00,N,PA063,A2F-MP6-R04,IAC,IACP,MP6,A,2]
====Finish====
最终发现答案是我忘了设置主机来激发独立群集。我在本地Spark Server上提交了Spark Job。
将主机设置为引发独立群集后,该作业正常工作。也许是因为本地Spark服务器没有足够的核心来执行任务。(顺便说一句,这是一台旧机器。