使用Scala插件在IntelliJ中创建解码字节数组的流时,Spark createStream错误



我正试图修改KafkaWordCount spark流示例以接收字节流。这是我到目前为止的代码:

 def main(args: Array[String]) {
 if (args.length < 4) {
   System.err.println("Usage: KafkaWordCount <zkQuorum> <group> <topics> <numThreads>")
   System.exit(1)
 }
 val Array(zkQuorum, group, topics, numThreads) = args
 val sparkConf = new SparkConf().setAppName("SiMod").setMaster("local[2]")
 val ssc = new StreamingContext(sparkConf, Seconds(2))
 ssc.checkpoint("checkpoint")
 var event: Event = null
 val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
 val lines = KafkaUtils.createStream[String, Array[Byte], DefaultDecoder, DefaultDecoder](ssc, zkQuorum, group, topicMap, StorageLevel.MEMORY_ONLY_SER)

最后一行-

val lines = KafkaUtils.createStream[String, Array[Byte], DefaultDecoder, DefaultDecoder](ssc, zkQuorum, group, topicMap, StorageLevel.MEMORY_ONLY_SER)

在IntelliJ中给出错误,尽管据我所见,我的用法与其他示例相同。

Error:(35, 41) overloaded method value createStream with alternatives:
  (jssc: org.apache.spark.streaming.api.java.JavaStreamingContext,keyTypeClass:     Class[String],valueTypeClass: Class[Array[Byte]],keyDecoderClass: Class[kafka.serializer.DefaultDecoder],valueDecoderClass: Class[kafka.serializer.DefaultDecoder],kafkaParams: java.util.Map[String,String],topics: java.util.Map[String,Integer],storageLevel: org.apache.spark.storage.StorageLevel)org.apache.spark.streaming.api.java.JavaPairReceiverInputDStream[String,Array[Byte]] <and>
(ssc: org.apache.spark.streaming.StreamingContext,kafkaParams: scala.collection.immutable.Map[String,String],topics: scala.collection.immutable.Map[String,Int],storageLevel: org.apache.spark.storage.StorageLevel)(implicit evidence$1: scala.reflect.ClassTag[String], implicit evidence$2: scala.reflect.ClassTag[Array[Byte]], implicit evidence$3: scala.reflect.ClassTag[kafka.serializer.DefaultDecoder], implicit evidence$4: scala.reflect.ClassTag[kafka.serializer.DefaultDecoder])org.apache.spark.streaming.dstream.ReceiverInputDStream[(String, Array[Byte])]
cannot be applied to (org.apache.spark.streaming.StreamingContext, String, String, scala.collection.immutable.Map[String,Int])
 val lines = KafkaUtils.createStream[String, Array[Byte], DefaultDecoder, DefaultDecoder](ssc, zkQuorum, group, topicMap)

我该怎么办?

尝试用字符串解码器代替Key:

val lines = KafkaUtils.createStream[String, Array[Byte], StringDecoder, DefaultDecoder](ssc, zkQuorum, group, topicMap, StorageLevel.MEMORY_ONLY_SER)

相关内容

  • 没有找到相关文章

最新更新