我正试图修改KafkaWordCount spark流示例以接收字节流。这是我到目前为止的代码:
def main(args: Array[String]) {
if (args.length < 4) {
System.err.println("Usage: KafkaWordCount <zkQuorum> <group> <topics> <numThreads>")
System.exit(1)
}
val Array(zkQuorum, group, topics, numThreads) = args
val sparkConf = new SparkConf().setAppName("SiMod").setMaster("local[2]")
val ssc = new StreamingContext(sparkConf, Seconds(2))
ssc.checkpoint("checkpoint")
var event: Event = null
val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
val lines = KafkaUtils.createStream[String, Array[Byte], DefaultDecoder, DefaultDecoder](ssc, zkQuorum, group, topicMap, StorageLevel.MEMORY_ONLY_SER)
最后一行-
val lines = KafkaUtils.createStream[String, Array[Byte], DefaultDecoder, DefaultDecoder](ssc, zkQuorum, group, topicMap, StorageLevel.MEMORY_ONLY_SER)
在IntelliJ中给出错误,尽管据我所见,我的用法与其他示例相同。
Error:(35, 41) overloaded method value createStream with alternatives:
(jssc: org.apache.spark.streaming.api.java.JavaStreamingContext,keyTypeClass: Class[String],valueTypeClass: Class[Array[Byte]],keyDecoderClass: Class[kafka.serializer.DefaultDecoder],valueDecoderClass: Class[kafka.serializer.DefaultDecoder],kafkaParams: java.util.Map[String,String],topics: java.util.Map[String,Integer],storageLevel: org.apache.spark.storage.StorageLevel)org.apache.spark.streaming.api.java.JavaPairReceiverInputDStream[String,Array[Byte]] <and>
(ssc: org.apache.spark.streaming.StreamingContext,kafkaParams: scala.collection.immutable.Map[String,String],topics: scala.collection.immutable.Map[String,Int],storageLevel: org.apache.spark.storage.StorageLevel)(implicit evidence$1: scala.reflect.ClassTag[String], implicit evidence$2: scala.reflect.ClassTag[Array[Byte]], implicit evidence$3: scala.reflect.ClassTag[kafka.serializer.DefaultDecoder], implicit evidence$4: scala.reflect.ClassTag[kafka.serializer.DefaultDecoder])org.apache.spark.streaming.dstream.ReceiverInputDStream[(String, Array[Byte])]
cannot be applied to (org.apache.spark.streaming.StreamingContext, String, String, scala.collection.immutable.Map[String,Int])
val lines = KafkaUtils.createStream[String, Array[Byte], DefaultDecoder, DefaultDecoder](ssc, zkQuorum, group, topicMap)
我该怎么办?
尝试用字符串解码器代替Key:
val lines = KafkaUtils.createStream[String, Array[Byte], StringDecoder, DefaultDecoder](ssc, zkQuorum, group, topicMap, StorageLevel.MEMORY_ONLY_SER)