我们正在尝试使用Spark Streaming和Spark SQL来实现一个用例,它允许我们对一些数据运行用户定义的规则(参见下面如何捕获和使用数据)。其思想是使用SQL指定规则,并将结果作为警报返回给用户。基于每个传入事件批执行查询似乎非常慢。如果有人能提出更好的方法来实现这个用例,我将不胜感激。此外,想知道Spark是否在驱动程序或工作程序上执行sql ?提前感谢。下面给出的是我们执行的步骤,以实现这一目标-
1)从外部数据库加载初始数据集作为JDBCRDD
JDBCRDD<SomeState> initialRDD = JDBCRDD.create(...);
2)创建一个传入DStream(捕获初始化数据的更新)
JavaReceiverInputDStream<SparkFlumeEvent> flumeStream =
FlumeUtils.createStream(ssc, flumeAgentHost, flumeAgentPort);
JavaDStream<SomeState> incomingDStream = flumeStream.map(...);
3)使用传入的DStream
创建Pair DStreamJavaPairDStream<Object,SomeState> pairDStream =
incomingDStream.map(...);
4)从DStream pair中创建一个有状态的DStream,使用初始化的RDD作为基态
JavaPairDStream<Object,SomeState> statefulDStream = pairDStream.updateStateByKey(...);
JavaRDD<SomeState> updatedStateRDD = statefulDStream.map(...);
5)基于传入流
中的值对更新状态运行用户驱动的查询incomingStream.foreachRDD(new Function<JavaRDD<SomeState>,Void>() {
@Override
public Void call(JavaRDD<SomeState> events) throws Exception {
updatedStateRDD.count();
SQLContext sqx = new SQLContext(events.context());
schemaDf = sqx.createDataFrame(updatedStateRDD, SomeState.class);
schemaDf.registerTempTable("TEMP_TABLE");
sqx.sql(SELECT col1 from TEMP_TABLE where <condition1> and <condition2> ...);
//collect the results and process and send alerts
...
}
);
第一步应该是确定哪一步花费的时间最多。请查看Spark Master UI并确定哪个步骤/阶段花费了大部分时间。
有几个最佳实践+我的观察,你可以考虑:-
- 使用Singleton SQLContext -参见示例- https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/SqlNetworkWordCount.scala
- updateStateByKey在大量键的情况下可能是内存密集型操作。您需要检查所处理的数据大小updateStateByKey函数以及它是否适合给定的记忆。
- 你的GC表现如何? 你真的在用"initialRDD"吗?如果没有,那么不要加载它。如果是静态数据集,则缓存它。
- 检查SQL查询所花费的时间。
这里有一些问题/领域可以帮助你
- DStreams的存储级别是什么?
- 集群大小和集群配置
- 版本的Spark?
最后- ForEachRDD是一个输出操作,它在驱动程序上执行给定的函数,但RDD可能会执行操作,这些操作在工作节点上执行。
您可能需要阅读这个更好地解释输出操作- http://spark.apache.org/docs/latest/streaming-programming-guide.html#output-operations-on-dstreams
我也面临同样的问题,你能让我知道你是否有同样的解决方案吗?虽然我在下面的帖子中提到了详细的用例。
Spark SQL +窗口+流问题- Spark SQL查询在使用Spark流运行时执行时间长