我有以下代码:
val conf = new SparkConf()
.setAppName("KafkaReceiver")
.set("spark.cassandra.connection.host", "192.168.0.78")
.set("spark.cassandra.connection.keep_alive_ms", "20000")
.set("spark.executor.memory", "2g")
.set("spark.driver.memory", "4g")
.set("spark.submit.deployMode", "cluster")
.set("spark.executor.instances", "3")
.set("spark.executor.cores", "3")
.set("spark.shuffle.service.enabled", "false")
.set("spark.dynamicAllocation.enabled", "false")
.set("spark.io.compression.codec", "snappy")
.set("spark.rdd.compress", "true")
.set("spark.streaming.backpressure.enabled", "true")
.set("spark.streaming.backpressure.initialRate", "200")
.set("spark.streaming.receiver.maxRate", "500")
val sc = SparkContext.getOrCreate(conf)
val ssc = new StreamingContext(sc, Seconds(10))
val sqlContext = new SQLContext(sc)
val kafkaParams = Map[String, String](
"bootstrap.servers" -> "192.168.0.113:9092",
"group.id" -> "test-group-aditya",
"auto.offset.reset" -> "largest")
val topics = Set("random")
val kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics)
我正在通过以下命令通过spark-submit
运行代码:
dse> bin/dse spark-submit --class test.kafkatesting /home/aditya/test.jar
我在不同的机器上安装了一个三节点Cassandra DSE群集。每当我运行应用程序时,它都会需要大量数据,并开始创建一个活动批次的队列,从而创建了积压和长期计划延迟。我如何提高性能并控制队列,以便仅在完成当前批处理后才收到新批次?
我找到了解决方案,在代码中进行了一些优化。与RDD相比,无需保存RDD尝试创建数据框架,而是将DF保存到Cassandra。另外,增加核心和执行器内存的NO,以取得良好的结果。
谢谢,