分析JSON数据存储在Scala变量中的字符串问题



环境:Spark 1.6,Scala

通过API调用,我使用Scala中的curl将JSON数据存储在变量的" RAWDATA"中。将变量的内容保存为HDFS中的文件。现在,我想从变量(RAWDATA)数据中解析并创建表,因为它已经包含数据作为字符串(而不是从HDFS读取保存的文件)。我已经尝试了以下方式,但出错。我正在使用调度程序以30秒的间隔获得更新的记录。有更好的方法可以实现我的目标吗?

object ConnTest extends App {
  val conf = new SparkConf()
  val sc = new SparkContext(conf.setAppName("Spark Ingestion").setMaster("local[*]"))
  val hivecontext = new HiveContext(sc)
  var run_id = java.time.LocalDate.now
  val format = new SimpleDateFormat("YYYYMMddHHmmss")
  val actorSystem = ActorSystem()
  val scheduler = actorSystem.scheduler
  val task = new Runnable {
    def run() {
      val dt = format.format(Calendar.getInstance().getTime())
      println(dt)
      val writeFilePath="vlinkAlarm_" + dt+".json"
      val rdd1=sc.parallelize(jsonWriter(writeFilePath))**// ERROR No Spark Context**
      rdd1.foreach(println)
    }
  }
  implicit val executor = actorSystem.dispatcher
  scheduler.schedule(
    initialDelay = Duration(5, TimeUnit.SECONDS),
    interval = Duration(30, TimeUnit.SECONDS),
    runnable = task)

  val uri="hdfs://quickstart.cloudera:8020"
  val conf1 = new Configuration()
  conf.set("fs.defaultFS", uri)
  val fs = FileSystem.get(conf1)
  def jsonWriter(fileName: String): String = {
  println(fileName)
    val rawdata = "curl http://services.groupkt.com/state/get/USA/all"!!
rawdata
  }
  }

===========

16/12/13 15:53:25 INFO remote.RemoteActorRefProvider$RemotingTerminator: Remoting shut down.
20161213155329
vlinkAlarm_20161213155329.json
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100 44013    0 44013    0     0  68060      0 --:--:-- --:--:-- --:--:-- 68026
[ERROR] [12/13/2016 15:53:30.532] [default-akka.actor.default-dispatcher-2] [TaskInvocation] Cannot call methods on a stopped SparkContext.
This stopped SparkContext was created at:

和一段时间后

The currently active SparkContext was created at:
(No active SparkContext.)
java.lang.IllegalStateException: Cannot call methods on a stopped SparkContext.
This stopped SparkContext was created at:

我的JSON数据看起来像

{
    "results": [{
        "id": "6475867",
        "date": "2016-12-09",
        "time": "16:50:49",
        "varbinds": ["", "", "", "", "", "", "", "", "", "", "", "", "", "", "", ""]
    }, {
        "id": "6475866",
        "date": "2016-12-09",
        "time": "16:50:05",
        "varbinds": ["4", "192.255.54.154:1136", "CASAH 4", "52", "6", "", "", "", "", "", "", "", "", "", "", ""]
    }]
}

谢谢
Hossain

我无法完全解决此问题,而是找到了解决方法。仅在循环中使用的线程而不是调度程序。使用以下代码来解析JSON存储在可变rawdata中的字符串。

for (i<- 1 to 10){
val url = "http://10.51.253.11:8082/vistalink/1/alarm.json?isVerbose=true"
    val rawdata = scala.io.Source.fromURL(url).mkString
val RDDFromString = sc.parallelize(rawdata :: Nil)
    val DF = hivecontext.read.json(RDDFromString)
DF.printSchema()
Thread.sleep(1000*30)
}

谢谢
Hossain

最新更新