环境: Kafka 10, Spark2.1
我正在尝试存储 Kakfa 偏移外部存储。在浏览了Apache Spark网站和一些在线研究之后,我能够编写以下代码。现在得到错误-
"Error:(190, 7) object Assign in package kafka010 cannot be accessed in package org.apache.spark.streaming.kafka010
Assign[String, String](fromOffsets.keys.toList, kafkaParams, fromOffsets)"
我的代码:
import com.typesafe.config.{ConfigFactory, ConfigValueFactory}
import org.apache.kafka.clients.consumer.{ConsumerRecord, KafkaConsumer}
import org.apache.kafka.common.TopicPartition
import org.apache.spark.{SparkConf, SparkContext, TaskContext}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.{StringType, StructField, StructType}
object offTest {
def main(args: Array[String]) {
val Array(impalaHost, brokers, topics, consumerGroupId, ssl, truststoreLocation, truststorePassword, wInterval) = args
val sparkSession = SparkSession.builder
.config("spark.hadoop.parquet.enable.summary-metadata", "false")
.enableHiveSupport()
.getOrCreate
val ssc = new StreamingContext(sparkSession.sparkContext, Seconds(wInterval.toInt))
val isUsingSsl = ssl.toBoolean
// Create direct kafka stream with brokers and topics
val topicsSet = topics.split(",").toSet
val commonParams = Map[String, Object](
"bootstrap.servers" -> brokers,
"security.protocol" -> (if (isUsingSsl) "SASL_SSL" else "SASL_PLAINTEXT"),
"sasl.kerberos.service.name" -> "kafka",
"auto.offset.reset" -> "latest",
"key.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer",
"value.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer",
"group.id" -> consumerGroupId,
"enable.auto.commit" -> (false: java.lang.Boolean)
)
val additionalSslParams = if (isUsingSsl) {
Map(
"ssl.truststore.location" -> truststoreLocation,
"ssl.truststore.password" -> truststorePassword
)
} else {
Map.empty
}
val kafkaParams = commonParams ++ additionalSslParams
val fromOffsets= Map[Object,Long](new TopicPartition(topics, 4) -> 4807048129L)
val stream = KafkaUtils.createDirectStream[String, String](
ssc,
LocationStrategies.PreferConsistent,
Assign[String, String](fromOffsets.keys.toList, kafkaParams, fromOffsets)
)
stream.foreachRDD { rdd =>
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
rdd.foreachPartition { iter =>
val o: OffsetRange = offsetRanges(TaskContext.get.partitionId)
println(s"${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset}")
// I will insert those values to other database later
}
}
val data= stream.map(record => (record.key, record.value))
data.foreachRDD(rdd1 => {
val value = rdd1.map(x => x._2)
if (! value.isEmpty()) {
value.foreach(println)
}
else
{println("no data")}
})
ssc.start()
ssc.awaitTermination()
}
}
错误在以下行-
Assign[String, String](fromOffsets.keys.toList, kafkaParams, fromOffsets(
我做了一些研究,这背后的原因是进口包之间的冲突。但是,我无法解决这个问题。任何类型的帮助或代码示例将不胜感激。
谢谢赖莎
您需要
通过 ConsumerStrategies
对象创建一个 Assign
实例,如下所示:
val stream = KafkaUtils.createDirectStream[String, String](
ssc,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Assign[String, String](fromOffsets.keys.toList, kafkaParams, fromOffsets)
)