如何在启动Spark Streaming进程时加载历史数据,并计算正在运行的聚合



我在ElasticSearch集群中有一些与销售相关的JSON数据,我想使用Spark Streaming(使用Spark 1.4.1)通过Kafka动态聚合来自我的电子商务网站的传入销售事件,以获得用户总销售额的当前视图(在收入和产品方面)。

我读到的文档中不太清楚的是,我如何在Spark应用程序启动时从ElasticSearch加载历史数据,并计算例如每用户的总体收入(基于历史和来自Kafka的收入)。

我有以下(工作)代码连接到我的Kafka实例并接收JSON文档:

import kafka.serializer.StringDecoder
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka._
import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.sql.SQLContext
object ReadFromKafka {
  def main(args: Array[String]) {
    val checkpointDirectory = "/tmp"
    val conf = new SparkConf().setAppName("Read Kafka JSONs").setMaster("local[2]")
    val topicsSet = Array("tracking").toSet
    val sc = new SparkContext(conf)
    val ssc = new StreamingContext(sc, Seconds(10))
    // Create direct kafka stream with brokers and topics
    val kafkaParams = Map[String, String]("metadata.broker.list" -> "localhost:9092")
    val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
      ssc, kafkaParams, topicsSet)
    //Iterate
    messages.foreachRDD { rdd =>
      //If data is present, continue
      if (rdd.count() > 0) {
        //Create SQLContect and parse JSON
        val sqlContext = new SQLContext(sc)
        val trackingEvents = sqlContext.read.json(rdd.values)
        //Sample aggregation of incoming data
        trackingEvents.groupBy("type").count().show()
      }
    }
    // Start the computation
    ssc.start()
    ssc.awaitTermination()
  }
}

我知道ElasticSearch有一个插件(https://www.elastic.co/guide/en/elasticsearch/hadoop/master/spark.html#spark-read),但是我不太清楚如何在启动时集成读取,以及流计算过程来聚合历史数据和流数据。

非常感谢您的帮助!

rdd是不可变的,因此在创建rdd之后,您不能向其添加数据,例如使用新事件更新收入。

您可以做的是将现有数据与新事件联合起来创建一个新的RDD,然后您可以将其用作当前总数。例如…

var currentTotal: RDD[(Key, Value)] = ... //read from ElasticSearch
messages.foreachRDD { rdd =>
    currentTotal = currentTotal.union(rdd)
}

在本例中,我们将currentTotal命名为var,因为当它与传入数据合并时,它将被对新RDD的引用所替换。

在合并之后,您可能想要执行一些进一步的操作,例如减少属于同一Key的值,但是您了解情况。

如果您使用这种技术,请注意您的RDD的谱系将会增长,因为每个新创建的RDD将引用它的父RDD。这可能导致堆栈溢出样式沿袭问题。要解决这个问题,您可以定期调用RDD上的checkpoint()

相关内容

  • 没有找到相关文章

最新更新