从加入两个数据框架收集结果时,Spark Job挂起而无需任何例外



我正在尝试将两个表格从卡桑德拉查询两个数据范围,然后将这两个dataframes加入一个dataframe(结果)。

我可以得到正确的结果,并且火花作业可以在我的计算机中正常完成。但是,当我提交给Spark Server(本地模式)时,该作业只会挂起而没有任何例外或错误消息,一个小时后才能完成,直到我按Ctrl C停止它。

我不知道为什么该作业无法在Spark Server上工作,Eclipse和Spark Server之间有什么区别。如果原因是一个有可能的问题,是否可能会出现任何例外,只是挂了吗?

有建议吗?谢谢〜

提交命令

/usr/bin/spark-submit --class com.test.c2c --jars file:///home/iotcloud/Documents/grace/spark/spark-cassandra-connector-1.6.3-s_2.10.jar file:///home/iotcloud/Documents/grace/spark/C2C_1205.jar

这是我的Scala代码:

package com.test
import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import com.datastax.spark.connector.cql._;
import com.datastax.spark.connector._;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql._;
import org.apache.spark.sql.cassandra._;
object c2c {
  def main(args: Array[String]) {
    println("Start...")
    val conf = new SparkConf(true)
      .set("spark.cassandra.connection.host", "10.2.1.67")
      .setAppName("ConnectToCassandra")
      .setMaster("local")
    val sc = new SparkContext(conf)
    println("Cassandra setting done...")

    println("================================================1")
    println("Start to save to cassandra...")
    val cc = new CassandraSQLContext(sc)
    cc.setKeyspace("iot_test")
    val df_info = cc.sql("select gatewaymac,sensormac,sensorid,sensorfrequency,status from tsensor_info where gatewaymac != 'failed'")
    val df_loc = cc.sql("select sensorlocationid,sensorlocationname,company,plant,department,building,floor,sensorid from tsensorlocation_info where sensorid != 'NULL'")
    println("================================================2")
    println("registerTmepTable...")
    df_info.registerTempTable("info")
    df_loc.registerTempTable("loc")

    println("================================================4")
    println("mapping table...")
    println("===info===")
    df_info.printSchema()
    df_info.take(5).foreach(println)
    println("===location===")
    df_loc.printSchema()
    df_loc.take(5).foreach(println)
    println("================================================5")
    println("print mapping result")
    val result = df_info.join(df_loc, "sensorid")
    result.registerTempTable("ref")
    result.printSchema()
    result.take(5).foreach(println)
    println("====Finish====")
    sc.stop()
  }
}

日食的正常结果

Cassandra setting done...
================================================1
Start to save to cassandra...
================================================2
registerTmepTable...
================================================4
mapping table...
===info===
root
 |-- gatewaymac: string (nullable = true)
 |-- sensormac: string (nullable = true)
 |-- sensorid: string (nullable = true)
 |-- sensorfrequency: string (nullable = true)
 |-- status: string (nullable = true)
[0000aaaaaaaaat7f,e9d050f0ebc25000    ,0000aaaaaaaaat7f3242,null,N]
[000000000000b219,c879b4f921c25000    ,000000000000b2193590,00:01,N]
[0000aaaaaaaaaabb,2c153cf9f0c25000    ,0000aaaaaaaaaabba353,null,Y]
[000000000000a412,17da712795c25000    ,000000000000a4126156,00:05,Y]
[000000000000a104,b2a4b8b7a6c25000    ,000000000000a1046340,00:01,N]
===location===
root
 |-- sensorlocationid: string (nullable = true)
 |-- sensorlocationname: string (nullable = true)
 |-- company: string (nullable = true)
 |-- plant: string (nullable = true)
 |-- department: string (nullable = true)
 |-- building: string (nullable = true)
 |-- floor: string (nullable = true)
 |-- sensorid: string (nullable = true)
[JA092,A1F-G-L00-S066,IAC,IACJ,MT,A,1,000000000000a108a19f]
[JA044,A2F-I-L00-S037,IAC,IACJ,MT,A,2,000000000000a2024246]
[JA111,A2F-C-L00-S076,IAC,IACJ,MPA,A,2,000000000000a210c710]
[PA041,A1F-SMT-S03,IAC,IACP,SMT,A,1,000000000000a10354c1]
[PC010,C3F-IQC-S03,IAC,IACP,IQC,C,3,000000000000c3269786]
================================================5
print mapping result
root
 |-- sensorid: string (nullable = true)
 |-- gatewaymac: string (nullable = true)
 |-- sensormac: string (nullable = true)
 |-- sensorfrequency: string (nullable = true)
 |-- status: string (nullable = true)
 |-- sensorlocationid: string (nullable = true)
 |-- sensorlocationname: string (nullable = true)
 |-- company: string (nullable = true)
 |-- plant: string (nullable = true)
 |-- department: string (nullable = true)
 |-- building: string (nullable = true)
 |-- floor: string (nullable = true)
[000000000000a10275bc,000000000000a102,e85ce9b9d2c25000    ,00:05,Y,PA030,A1F-WM-S02,IAC,IACP,WM,A,1]
[000000000000b117160c,000000000000b117,33915a79e5c25000    ,00:05,Y,PB011,B1F-WM-S01,IAC,IACP,WM,B,1]
[000000000000a309024b,000000000000a309,afdab2efbbc25000    ,00:00,N,PA101,A3F-MP6-R01,IAC,IACP,MP6,A,3]
[000000000000c6294109,000000000000c629,383cca8e45c25000    ,00:05,Y,PC017,C6F-WM-S01,IAC,IACP,WM,C,6]
[000000000000a205e52e,000000000000a205,8d83303cf4c25000    ,00:00,N,PA063,A2F-MP6-R04,IAC,IACP,MP6,A,2]
====Finish====

最终发现答案是我忘了设置主机来激发独立群集。我在本地Spark Server上提交了Spark Job。

将主机设置为引发独立群集后,该作业正常工作。也许是因为本地Spark服务器没有足够的核心来执行任务。(顺便说一句,这是一台旧机器。

最新更新