我正在使用mapwithsafe开发一个spark流应用程序
trackStateFunc的定义如下:
def trackStateFunc(batchTime: Time, key: KEY_JOINS, value: Option[(Option[DXE_NOR], Option[NW_NOR], Option[CHG_NOR])],state: State[HashMap[KEY_JOINS, Set[(Option[DXE_NOR], Option[NW_NOR], Option[CHG_NOR])]] with MultiMap[KEY_JOINS, (Option[DXE_NOR], Option[NW_NOR], Option[CHG_NOR])]])
: Option[(KEY_JOINS, HashMap[KEY_JOINS, Set[(Option[DXE_NOR], Option[NW_NOR], Option[CHG_NOR])]] with MultiMap[KEY_JOINS, (Option[DXE_NOR], Option[NW_NOR], Option[CHG_NOR])])] =
{
if(key.SUBSCRIBER_ID=="621300315445434" && key.CHARGING_ID=="3803685246")
println("Key is :"+key.toString()+"Value is :"+value.toString()+"State is :"+state.getOption().toString())
if (state.isTimingOut()) {
//write state to database
println("State with key"+key.toString() +"is timing out")
return None
}
//var c:HashMap[KEY_JOINS, Set[(Option[DXE_NOR], Option[NW_NOR], Option[CHG_NOR])]] with MultiMap[KEY_JOINS, (Option[DXE_NOR], Option[NW_NOR], Option[CHG_NOR])]
else if (state.exists) {
var c = state.get()
println("value is"+value.get.toString())
println("c is"+c.toString())
if(!c.entryExists(key, _==value.get)) {
c.addBinding(key, value.get)
state.update(c)
}
//println("State is :"+state.get().toString())
Some(key, c)
}
else {
val a = new HashMap[KEY_JOINS, Set[(Option[DXE_NOR], Option[NW_NOR], Option[CHG_NOR])]] with MultiMap[KEY_JOINS, (Option[DXE_NOR], Option[NW_NOR], Option[CHG_NOR])]
/*
a.addBinding(key, value.getOrElse((Some(new DXE_NOR("temp DXE", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "")),
Some(NW_NOR("temp NW", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "")),
Some(CHG_NOR("temp CHG", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "")))))
*/
a.addBinding(key, value.get)
state.update(a)
Some(key, a)
}
}
然而,当转换以下值时,我得到如下异常…
value is(None,Some(NW_NOR.SUBSCRIBER_ID=621300245125288, NW_NOR.CHARGING_ID=2635156471, NW_NOR.Node_ID=, NW_NOR.Start_Time=2016-05-01 14:06:31 +01:00 ,NW_NOR.CDR_SOURCE=, NW_NOR.FILE_POPULATION_TIME=, NW_NOR.CDR_REASON=recordClosure, NW_NOR.PARTIAL_SEQ_NUM=, NW_NOR.PARTIAL_IND=, NW_NOR.TERMINATION_CAUSE=CauseForRecClosing_sGSNChange, NW_NOR.SOURCE_SEQ_NUM=1, NW_NOR.IMSI=621300245125288, NW_NOR.MSISDN=92348E+14, NW_NOR.IMEI=8.6047E+15, NW_NOR.UE_IP_ADDRESS=10.145.221.89, NW_NOR.EVENT_START_TIME=2016-05-01 14:06:31 +01:00, , NW_NOR.EVENT_END_TIME=2016-05-01 14:06:31 +01:00, NW_NOR.DURATION=0, NW_NOR.UPLINK_VOL=0, NW_NOR.DOWNLINK_VOL=0, NW_NOR.TOTAL_VOL=0, NW_NOR.RAT_TYPE=2, NW_NOR.HPMN=6213, NW_NOR.VPMN=62130, NW_NOR.ROAM_TYPE=,NW_NOR.GATEWAY_IP_ADDRESS=41.220.67.18, NW_NOR.SERVING_IP_ADDRESS=41.206.6.147$, NW_NOR.APN=web.gprs.mtnnigeria.net, NW_NOR.APN_TYPE=, NW_NOR.CHARGING_CHARACTERISTICS=400, NW_NOR.PAY_TYPE=, NW_NOR.RATING_GROUP=, NW_NOR.APN_RA_FLAG=, NW_NOR.ORIGINAL_FILE_NAME=ABMPG01_20160501140632_49495),None)
c isMap(KEY_JOINS.SUBSCRIBER_ID=621300245125288, KEY_JOIN.CHARGING_ID=2635156471 -> Set((None,Some(NW_NOR.SUBSCRIBER_ID=621300245125288, NW_NOR.CHARGING_ID=2635156471, NW_NOR.Node_ID=, NW_NOR.Start_Time=2016-05-01 14:06:31 +01:00 ,NW_NOR.CDR_SOURCE=, NW_NOR.FILE_POPULATION_TIME=, NW_NOR.CDR_REASON=recordClosure, NW_NOR.PARTIAL_SEQ_NUM=, NW_NOR.PARTIAL_IND=, NW_NOR.TERMINATION_CAUSE=CauseForRecClosing_sGSNChange, NW_NOR.SOURCE_SEQ_NUM=1, NW_NOR.IMSI=621300245125288, NW_NOR.MSISDN=2348133430001, NW_NOR.IMEI=8604700004664707, NW_NOR.UE_IP_ADDRESS=10.145.221.89, NW_NOR.EVENT_START_TIME=2016-05-01 14:06:31 +01:00, , NW_NOR.EVENT_END_TIME=2016-05-01 14:06:31 +01:00, NW_NOR.DURATION=0, NW_NOR.UPLINK_VOL=0, NW_NOR.DOWNLINK_VOL=0, NW_NOR.TOTAL_VOL=0, NW_NOR.RAT_TYPE=2, NW_NOR.HPMN=6213, NW_NOR.VPMN=62130, NW_NOR.ROAM_TYPE=,NW_NOR.GATEWAY_IP_ADDRESS=41.220.67.18, NW_NOR.SERVING_IP_ADDRESS=41.206.6.147$, NW_NOR.APN=web.gprs.mtnnigeria.net, NW_NOR.APN_TYPE=, NW_NOR.CHARGING_CHARACTERISTICS=0400, NW_NOR.PAY_TYPE=, NW_NOR.RATING_GROUP=, NW_NOR.APN_RA_FLAG=, NW_NOR.ORIGINAL_FILE_NAME=ABMPG01_20160501140632_49495),None), (None,Some(NW_NOR.SUBSCRIBER_ID=621300245125288, NW_NOR.CHARGING_ID=2635156471, NW_NOR.Node_ID=, NW_NOR.Start_Time=2016-05-01 14:06:31 +01:00 ,NW_NOR.CDR_SOURCE=, NW_NOR.FILE_POPULATION_TIME=, NW_NOR.CDR_REASON=recordClosure, NW_NOR.PARTIAL_SEQ_NUM=, NW_NOR.PARTIAL_IND=, NW_NOR.TERMINATION_CAUSE=normalRelease, NW_NOR.SOURCE_SEQ_NUM=2, NW_NOR.IMSI=621300245125288, NW_NOR.MSISDN=2348133430001, NW_NOR.IMEI=8604700004664707, NW_NOR.UE_IP_ADDRESS=10.145.221.89, NW_NOR.EVENT_START_TIME=2016-05-01 14:06:31 +01:00, , NW_NOR.EVENT_END_TIME=2016-05-01 14:07:43 +01:00, NW_NOR.DURATION=72, NW_NOR.UPLINK_VOL=288, NW_NOR.DOWNLINK_VOL=224, NW_NOR.TOTAL_VOL=512, NW_NOR.RAT_TYPE=2, NW_NOR.HPMN=6213, NW_NOR.VPMN=62130, NW_NOR.ROAM_TYPE=,NW_NOR.GATEWAY_IP_ADDRESS=41.220.67.18, NW_NOR.SERVING_IP_ADDRESS=41.206.6.147$, NW_NOR.APN=web.gprs.mtnnigeria.net, NW_NOR.APN_TYPE=, NW_NOR.CHARGING_CHARACTERISTICS=0400, NW_NOR.PAY_TYPE=, NW_NOR.RATING_GROUP=, NW_NOR.APN_RA_FLAG=, NW_NOR.ORIGINAL_FILE_NAME=ABMPG01_20160501140747_49534),None)))
2016-09-01 10:01:50,373 [Executor task launch worker-3] ERROR org.apache.spark.executor.Executor - Exception in task 0.0 in stage 994.0 (TID 482)
java.lang.ClassCastException: scala.collection.mutable.HashMap cannot be cast to scala.collection.mutable.MultiMap
at StreamingEngineSt$.trackStateFunc(StreamEngineSt.scala:395)
我粘贴了C和value的确切值,同时得到了这个异常…我在
处得到异常 if(!c.entryExists(key, _==value.get))
但是c和value都有正确的类型
这对我来说有点奇怪,但我不得不将'Set'的类型更改为'ArrayBuffer',这样就不会得到任何类似的强制转换异常。也许,这是Spark(1.6)中的一个bug