环境:Spark 1.6,Scala
通过API调用,我使用Scala中的curl将JSON数据存储在变量的" RAWDATA"中。将变量的内容保存为HDFS中的文件。现在,我想从变量(RAWDATA)数据中解析并创建表,因为它已经包含数据作为字符串(而不是从HDFS读取保存的文件)。我已经尝试了以下方式,但出错。我正在使用调度程序以30秒的间隔获得更新的记录。有更好的方法可以实现我的目标吗?
object ConnTest extends App {
val conf = new SparkConf()
val sc = new SparkContext(conf.setAppName("Spark Ingestion").setMaster("local[*]"))
val hivecontext = new HiveContext(sc)
var run_id = java.time.LocalDate.now
val format = new SimpleDateFormat("YYYYMMddHHmmss")
val actorSystem = ActorSystem()
val scheduler = actorSystem.scheduler
val task = new Runnable {
def run() {
val dt = format.format(Calendar.getInstance().getTime())
println(dt)
val writeFilePath="vlinkAlarm_" + dt+".json"
val rdd1=sc.parallelize(jsonWriter(writeFilePath))**// ERROR No Spark Context**
rdd1.foreach(println)
}
}
implicit val executor = actorSystem.dispatcher
scheduler.schedule(
initialDelay = Duration(5, TimeUnit.SECONDS),
interval = Duration(30, TimeUnit.SECONDS),
runnable = task)
val uri="hdfs://quickstart.cloudera:8020"
val conf1 = new Configuration()
conf.set("fs.defaultFS", uri)
val fs = FileSystem.get(conf1)
def jsonWriter(fileName: String): String = {
println(fileName)
val rawdata = "curl http://services.groupkt.com/state/get/USA/all"!!
rawdata
}
}
===========
16/12/13 15:53:25 INFO remote.RemoteActorRefProvider$RemotingTerminator: Remoting shut down.
20161213155329
vlinkAlarm_20161213155329.json
% Total % Received % Xferd Average Speed Time Time Time Current
Dload Upload Total Spent Left Speed
100 44013 0 44013 0 0 68060 0 --:--:-- --:--:-- --:--:-- 68026
[ERROR] [12/13/2016 15:53:30.532] [default-akka.actor.default-dispatcher-2] [TaskInvocation] Cannot call methods on a stopped SparkContext.
This stopped SparkContext was created at:
和一段时间后
The currently active SparkContext was created at:
(No active SparkContext.)
java.lang.IllegalStateException: Cannot call methods on a stopped SparkContext.
This stopped SparkContext was created at:
我的JSON数据看起来像
{
"results": [{
"id": "6475867",
"date": "2016-12-09",
"time": "16:50:49",
"varbinds": ["", "", "", "", "", "", "", "", "", "", "", "", "", "", "", ""]
}, {
"id": "6475866",
"date": "2016-12-09",
"time": "16:50:05",
"varbinds": ["4", "192.255.54.154:1136", "CASAH 4", "52", "6", "", "", "", "", "", "", "", "", "", "", ""]
}]
}
谢谢
Hossain
我无法完全解决此问题,而是找到了解决方法。仅在循环中使用的线程而不是调度程序。使用以下代码来解析JSON存储在可变rawdata中的字符串。
for (i<- 1 to 10){
val url = "http://10.51.253.11:8082/vistalink/1/alarm.json?isVerbose=true"
val rawdata = scala.io.Source.fromURL(url).mkString
val RDDFromString = sc.parallelize(rawdata :: Nil)
val DF = hivecontext.read.json(RDDFromString)
DF.printSchema()
Thread.sleep(1000*30)
}
谢谢
Hossain