上下文:我使用ApacheSpark从日志中聚合不同事件类型的运行计数。日志存储在Cassandra中用于历史分析,存储在Kafka中用于实时分析。每个日志都有一个日期和事件类型。为了简单起见,让我们假设我想跟踪每天单个类型的日志的数量。
我们有两个RDD,一个是Cassandra的批处理数据RDD,另一个是Kafka的流式RDD。伪码:
CassandraJavaRDD<CassandraRow> cassandraRowsRDD = CassandraJavaUtil.javaFunctions(sc).cassandraTable(KEYSPACE, TABLE).select("date", "type");
JavaPairRDD<String, Integer> batchRDD = cassandraRowsRDD.mapToPair(new PairFunction<CassandraRow, String, Integer>() {
@Override
public Tuple2<String, Integer> call(CassandraRow row) {
return new Tuple2<String, Integer>(row.getString("date"), 1);
}
}).reduceByKey(new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer count1, Integer count2) {
return count1 + count2;
}
});
save(batchRDD) // Assume this saves the batch RDD somewhere
...
// Assume we read a chunk of logs from the Kafka stream every x seconds.
JavaPairReceiverInputDStream<String, String> kafkaStream = KafkaUtils.createStream(...);
JavaPairDStream<String, Integer> streamRDD = kafkaStream.flatMapToPair(new PairFlatMapFunction<Tuple2<String, String>, String, Integer>() {
@Override
public Iterator<Tuple2<String, Integer> call(Tuple2<String, String> data) {
String jsonString = data._2;
JSON jsonObj = JSON.parse(jsonString);
Date eventDate = ... // get date from json object
// Assume startTime is broadcast variable that is set to the time when the job started.
if (eventDate.after(startTime.value())) {
ArrayList<Tuple2<String, Integer>> pairs = new ArrayList<Tuple2<String, Integer>>();
pairs.add(new Tuple2<String, Integer>(jsonObj.get("date"), 1));
return pairs;
} else {
return new ArrayList<Tuple2<String, Integer>>(0); // Return empty list when we ignore some logs
}
}
}).reduceByKey(new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer count1, Integer count2) {
return count1 + count2;
}
}).updateStateByKey(new Function2<List<Integer>, Optional<List<Integer>>, Optional<Integer>>() {
@Override
public Optional<Integer> call(List<Integer> counts, Optional<Integer> state) {
Integer previousValue = state.or(0l);
Integer currentValue = ... // Sum of counts
return Optional.of(previousValue + currentValue);
}
});
save(streamRDD); // Assume this saves the stream RDD somewhere
sc.start();
sc.awaitTermination();
问题:如何将streamRDD的结果与batchRDD组合假设batchRDD
有以下数据,此作业在2014-10-16运行:
("2014-10-15", 1000000)
("2014-10-16", 2000000)
由于Cassandra查询只包括截至批处理查询开始时间的所有数据,因此我们必须在查询完成时读取Kafka,只考虑作业开始时间后的日志。我们假设查询需要很长时间。这意味着我需要将历史结果与流媒体结果相结合。
举例说明:
|------------------------|-------------|--------------|--------->
tBatchStart tStreamStart streamBatch1 streamBatch2
然后假设在第一批流中,我们得到了以下数据:
("2014-10-19", 1000)
然后我想把批RDD和这个流RDD结合起来,这样流RDD现在就有了值:
("2014-10-19", 2001000)
然后假设在第二批流中,我们得到了以下数据:
("2014-10-19", 4000)
然后,流RDD应该更新为具有以下值:
("2014-10-19", 2005000)
等等…
使用streamRDD.transformToPair(...)
可以使用join
将streamRDD数据与batchRDD数据组合,但如果我们对每个流块都这样做,那么我们将为每个流块添加来自batchRDD的计数,从而使状态值"重复计数",而状态值只应添加到第一个流块。
为了解决这种情况,我将基础rdd与保持流数据总量的聚合StateDStream
的结果联合起来。这有效地为在每个流式传输间隔上报告的数据提供了基线,而不计算所述基线x次。
我使用示例WordCount尝试了这个想法,它很有效。把这个放在REPL上,举个例子:
(使用单独外壳上的nc -lk 9876
为socketTextStream
提供输入)
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.storage.StorageLevel
@transient val defaults = List("magic" -> 2, "face" -> 5, "dust" -> 7 )
val defaultRdd = sc.parallelize(defaults)
@transient val ssc = new StreamingContext(sc, Seconds(10))
ssc.checkpoint("/tmp/spark")
val lines = ssc.socketTextStream("localhost", 9876, StorageLevel.MEMORY_AND_DISK_SER)
val words = lines.flatMap(_.split(" "))
val wordCount = words.map(x => (x, 1)).reduceByKey(_ + _)
val historicCount = wordCount.updateStateByKey[Int]{(newValues: Seq[Int], runningCount: Option[Int]) =>
Some(newValues.sum + runningCount.getOrElse(0))
}
val runningTotal = historicCount.transform{ rdd => rdd.union(defaultRdd)}.reduceByKey( _+_ )
wordCount.print()
historicCount.print()
runningTotal.print()
ssc.start()
您可以尝试updateStateByKey
:
def main(args: Array[String]) {
val updateFunc = (values: Seq[Int], state: Option[Int]) => {
val currentCount = values.foldLeft(0)(_ + _)
val previousCount = state.getOrElse(0)
Some(currentCount + previousCount)
}
// stream
val ssc = new StreamingContext("local[2]", "NetworkWordCount", Seconds(1))
ssc.checkpoint(".")
val lines = ssc.socketTextStream("127.0.0.1", 9999)
val words = lines.flatMap(_.split(" "))
val pairs = words.map(word => (word, 1))
val stateWordCounts = pairs.updateStateByKey[Int](updateFunc)
stateWordCounts.print()
ssc.start()
ssc.awaitTermination()
}