"object Assign in package kafka010 cannot be accessed " 在卡夫卡 Ofset 管理期间面临的问题



环境: Kafka 10, Spark2.1

我正在尝试存储 Kakfa 偏移外部存储。在浏览了Apache Spark网站和一些在线研究之后,我能够编写以下代码。现在得到错误-

"Error:(190, 7) object Assign in package kafka010 cannot be accessed in package org.apache.spark.streaming.kafka010
  Assign[String, String](fromOffsets.keys.toList, kafkaParams, fromOffsets)"

我的代码:

import com.typesafe.config.{ConfigFactory, ConfigValueFactory}
           import org.apache.kafka.clients.consumer.{ConsumerRecord, KafkaConsumer}
           import org.apache.kafka.common.TopicPartition
           import org.apache.spark.{SparkConf, SparkContext, TaskContext}
           import org.apache.spark.rdd.RDD
           import org.apache.spark.sql.{Row, SparkSession}
           import org.apache.spark.streaming.dstream.InputDStream
           import org.apache.spark.streaming.kafka010._
           import org.apache.spark.streaming.{Seconds, StreamingContext}
           import org.apache.spark.sql.functions._
           import org.apache.spark.sql.types.{StringType, StructField, StructType}
           object offTest {
             def main(args: Array[String]) {
           val Array(impalaHost, brokers, topics, consumerGroupId, ssl, truststoreLocation, truststorePassword, wInterval) = args
               val sparkSession = SparkSession.builder
                 .config("spark.hadoop.parquet.enable.summary-metadata", "false")
                 .enableHiveSupport()
                 .getOrCreate
               val ssc = new StreamingContext(sparkSession.sparkContext, Seconds(wInterval.toInt))
               val isUsingSsl = ssl.toBoolean
               // Create direct kafka stream with brokers and topics
               val topicsSet = topics.split(",").toSet
               val commonParams = Map[String, Object](
                 "bootstrap.servers" -> brokers,
                 "security.protocol" -> (if (isUsingSsl) "SASL_SSL" else "SASL_PLAINTEXT"),
                 "sasl.kerberos.service.name" -> "kafka",
                 "auto.offset.reset" -> "latest",
                 "key.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer",
                 "value.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer",
                 "group.id" -> consumerGroupId,
                 "enable.auto.commit" -> (false: java.lang.Boolean)
               )
               val additionalSslParams = if (isUsingSsl) {
                 Map(
                   "ssl.truststore.location" -> truststoreLocation,
                   "ssl.truststore.password" -> truststorePassword
                 )
               } else {
                 Map.empty
               }
               val kafkaParams = commonParams ++ additionalSslParams
               val fromOffsets= Map[Object,Long](new TopicPartition(topics, 4) -> 4807048129L)
               val stream = KafkaUtils.createDirectStream[String, String](
                 ssc,
                 LocationStrategies.PreferConsistent,
                 Assign[String, String](fromOffsets.keys.toList, kafkaParams, fromOffsets)
               )
               stream.foreachRDD { rdd =>
                 val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
                 rdd.foreachPartition { iter =>
                   val o: OffsetRange = offsetRanges(TaskContext.get.partitionId)
                   println(s"${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset}")
           // I will insert those values to other database later
                 }
               }    
              val data= stream.map(record => (record.key, record.value))
               data.foreachRDD(rdd1 => {
                 val value = rdd1.map(x => x._2)
                 if (! value.isEmpty()) {
                   value.foreach(println)
                 }
                 else
                 {println("no data")}
               })
               ssc.start()
               ssc.awaitTermination()
             }
           }

错误在以下行-

Assign[String, String](fromOffsets.keys.toList, kafkaParams, fromOffsets(

我做了一些研究,这背后的原因是进口包之间的冲突。但是,我无法解决这个问题。任何类型的帮助或代码示例将不胜感激。

谢谢赖莎

您需要

通过 ConsumerStrategies 对象创建一个 Assign 实例,如下所示:

val stream = KafkaUtils.createDirectStream[String, String](
  ssc,
  LocationStrategies.PreferConsistent,
  ConsumerStrategies.Assign[String, String](fromOffsets.keys.toList, kafkaParams, fromOffsets)
)

最新更新