scala.collection.mutable.HashMap不能被强制转换为scala.collection.mut



我正在使用mapwithsafe开发一个spark流应用程序

trackStateFunc的定义如下:

 def trackStateFunc(batchTime: Time, key: KEY_JOINS, value: Option[(Option[DXE_NOR], Option[NW_NOR], Option[CHG_NOR])],state: State[HashMap[KEY_JOINS, Set[(Option[DXE_NOR], Option[NW_NOR], Option[CHG_NOR])]] with MultiMap[KEY_JOINS, (Option[DXE_NOR], Option[NW_NOR], Option[CHG_NOR])]])
: Option[(KEY_JOINS, HashMap[KEY_JOINS, Set[(Option[DXE_NOR], Option[NW_NOR], Option[CHG_NOR])]] with MultiMap[KEY_JOINS, (Option[DXE_NOR], Option[NW_NOR], Option[CHG_NOR])])] =
{
 if(key.SUBSCRIBER_ID=="621300315445434" && key.CHARGING_ID=="3803685246")
 println("Key is :"+key.toString()+"Value is :"+value.toString()+"State is :"+state.getOption().toString())
if (state.isTimingOut()) {
  //write state to database
  println("State with key"+key.toString() +"is timing out")
  return None
}
//var c:HashMap[KEY_JOINS, Set[(Option[DXE_NOR], Option[NW_NOR], Option[CHG_NOR])]] with MultiMap[KEY_JOINS, (Option[DXE_NOR], Option[NW_NOR], Option[CHG_NOR])]
else if (state.exists) {
   var c = state.get()

  println("value is"+value.get.toString())
  println("c is"+c.toString())
  if(!c.entryExists(key, _==value.get)) {
    c.addBinding(key, value.get)
    state.update(c)
  }
    //println("State is :"+state.get().toString())

  Some(key, c)
}
else {

  val a = new HashMap[KEY_JOINS, Set[(Option[DXE_NOR], Option[NW_NOR], Option[CHG_NOR])]] with MultiMap[KEY_JOINS, (Option[DXE_NOR], Option[NW_NOR], Option[CHG_NOR])]
  /*
  a.addBinding(key, value.getOrElse((Some(new DXE_NOR("temp DXE", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "")),
    Some(NW_NOR("temp NW", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "")),
    Some(CHG_NOR("temp CHG", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "")))))
    */
  a.addBinding(key, value.get)
  state.update(a)
  Some(key, a)
}
}

然而,当转换以下值时,我得到如下异常…

value is(None,Some(NW_NOR.SUBSCRIBER_ID=621300245125288, NW_NOR.CHARGING_ID=2635156471, NW_NOR.Node_ID=, NW_NOR.Start_Time=2016-05-01 14:06:31 +01:00 ,NW_NOR.CDR_SOURCE=, NW_NOR.FILE_POPULATION_TIME=, NW_NOR.CDR_REASON=recordClosure, NW_NOR.PARTIAL_SEQ_NUM=, NW_NOR.PARTIAL_IND=, NW_NOR.TERMINATION_CAUSE=CauseForRecClosing_sGSNChange, NW_NOR.SOURCE_SEQ_NUM=1, NW_NOR.IMSI=621300245125288, NW_NOR.MSISDN=92348E+14, NW_NOR.IMEI=8.6047E+15, NW_NOR.UE_IP_ADDRESS=10.145.221.89, NW_NOR.EVENT_START_TIME=2016-05-01 14:06:31 +01:00, , NW_NOR.EVENT_END_TIME=2016-05-01 14:06:31 +01:00, NW_NOR.DURATION=0, NW_NOR.UPLINK_VOL=0, NW_NOR.DOWNLINK_VOL=0, NW_NOR.TOTAL_VOL=0, NW_NOR.RAT_TYPE=2, NW_NOR.HPMN=6213, NW_NOR.VPMN=62130, NW_NOR.ROAM_TYPE=,NW_NOR.GATEWAY_IP_ADDRESS=41.220.67.18, NW_NOR.SERVING_IP_ADDRESS=41.206.6.147$, NW_NOR.APN=web.gprs.mtnnigeria.net, NW_NOR.APN_TYPE=, NW_NOR.CHARGING_CHARACTERISTICS=400, NW_NOR.PAY_TYPE=, NW_NOR.RATING_GROUP=, NW_NOR.APN_RA_FLAG=, NW_NOR.ORIGINAL_FILE_NAME=ABMPG01_20160501140632_49495),None)
c isMap(KEY_JOINS.SUBSCRIBER_ID=621300245125288, KEY_JOIN.CHARGING_ID=2635156471 -> Set((None,Some(NW_NOR.SUBSCRIBER_ID=621300245125288, NW_NOR.CHARGING_ID=2635156471, NW_NOR.Node_ID=, NW_NOR.Start_Time=2016-05-01 14:06:31 +01:00 ,NW_NOR.CDR_SOURCE=, NW_NOR.FILE_POPULATION_TIME=, NW_NOR.CDR_REASON=recordClosure, NW_NOR.PARTIAL_SEQ_NUM=, NW_NOR.PARTIAL_IND=, NW_NOR.TERMINATION_CAUSE=CauseForRecClosing_sGSNChange, NW_NOR.SOURCE_SEQ_NUM=1, NW_NOR.IMSI=621300245125288, NW_NOR.MSISDN=2348133430001, NW_NOR.IMEI=8604700004664707, NW_NOR.UE_IP_ADDRESS=10.145.221.89, NW_NOR.EVENT_START_TIME=2016-05-01 14:06:31 +01:00, , NW_NOR.EVENT_END_TIME=2016-05-01 14:06:31 +01:00, NW_NOR.DURATION=0, NW_NOR.UPLINK_VOL=0, NW_NOR.DOWNLINK_VOL=0, NW_NOR.TOTAL_VOL=0, NW_NOR.RAT_TYPE=2, NW_NOR.HPMN=6213, NW_NOR.VPMN=62130, NW_NOR.ROAM_TYPE=,NW_NOR.GATEWAY_IP_ADDRESS=41.220.67.18, NW_NOR.SERVING_IP_ADDRESS=41.206.6.147$, NW_NOR.APN=web.gprs.mtnnigeria.net, NW_NOR.APN_TYPE=, NW_NOR.CHARGING_CHARACTERISTICS=0400, NW_NOR.PAY_TYPE=, NW_NOR.RATING_GROUP=, NW_NOR.APN_RA_FLAG=, NW_NOR.ORIGINAL_FILE_NAME=ABMPG01_20160501140632_49495),None), (None,Some(NW_NOR.SUBSCRIBER_ID=621300245125288, NW_NOR.CHARGING_ID=2635156471, NW_NOR.Node_ID=, NW_NOR.Start_Time=2016-05-01 14:06:31 +01:00 ,NW_NOR.CDR_SOURCE=, NW_NOR.FILE_POPULATION_TIME=, NW_NOR.CDR_REASON=recordClosure, NW_NOR.PARTIAL_SEQ_NUM=, NW_NOR.PARTIAL_IND=, NW_NOR.TERMINATION_CAUSE=normalRelease, NW_NOR.SOURCE_SEQ_NUM=2, NW_NOR.IMSI=621300245125288, NW_NOR.MSISDN=2348133430001, NW_NOR.IMEI=8604700004664707, NW_NOR.UE_IP_ADDRESS=10.145.221.89, NW_NOR.EVENT_START_TIME=2016-05-01 14:06:31 +01:00, , NW_NOR.EVENT_END_TIME=2016-05-01 14:07:43 +01:00, NW_NOR.DURATION=72, NW_NOR.UPLINK_VOL=288, NW_NOR.DOWNLINK_VOL=224, NW_NOR.TOTAL_VOL=512, NW_NOR.RAT_TYPE=2, NW_NOR.HPMN=6213, NW_NOR.VPMN=62130, NW_NOR.ROAM_TYPE=,NW_NOR.GATEWAY_IP_ADDRESS=41.220.67.18, NW_NOR.SERVING_IP_ADDRESS=41.206.6.147$, NW_NOR.APN=web.gprs.mtnnigeria.net, NW_NOR.APN_TYPE=, NW_NOR.CHARGING_CHARACTERISTICS=0400, NW_NOR.PAY_TYPE=, NW_NOR.RATING_GROUP=, NW_NOR.APN_RA_FLAG=, NW_NOR.ORIGINAL_FILE_NAME=ABMPG01_20160501140747_49534),None)))
2016-09-01 10:01:50,373 [Executor task launch worker-3] ERROR org.apache.spark.executor.Executor - Exception in task 0.0 in stage 994.0 (TID 482)
java.lang.ClassCastException: scala.collection.mutable.HashMap cannot be cast to scala.collection.mutable.MultiMap
    at StreamingEngineSt$.trackStateFunc(StreamEngineSt.scala:395)

我粘贴了C和value的确切值,同时得到了这个异常…我在

处得到异常
 if(!c.entryExists(key, _==value.get))

但是c和value都有正确的类型

这对我来说有点奇怪,但我不得不将'Set'的类型更改为'ArrayBuffer',这样就不会得到任何类似的强制转换异常。也许,这是Spark(1.6)中的一个bug

最新更新