Spark SQL + Streaming issues



我们正在尝试使用Spark StreamingSpark SQL来实现一个用例,它允许我们对一些数据运行用户定义的规则(参见下面如何捕获和使用数据)。其思想是使用SQL指定规则,并将结果作为警报返回给用户。基于每个传入事件批执行查询似乎非常慢。如果有人能提出更好的方法来实现这个用例,我将不胜感激。此外,想知道Spark是否在驱动程序或工作程序上执行sql ?提前感谢。下面给出的是我们执行的步骤,以实现这一目标-

1)从外部数据库加载初始数据集作为JDBCRDD

JDBCRDD<SomeState> initialRDD = JDBCRDD.create(...);

2)创建一个传入DStream(捕获初始化数据的更新)

JavaReceiverInputDStream<SparkFlumeEvent> flumeStream =
            FlumeUtils.createStream(ssc, flumeAgentHost, flumeAgentPort);
JavaDStream<SomeState> incomingDStream = flumeStream.map(...);

3)使用传入的DStream

创建Pair DStream
JavaPairDStream<Object,SomeState> pairDStream =
            incomingDStream.map(...);

4)从DStream pair中创建一个有状态的DStream,使用初始化的RDD作为基态

JavaPairDStream<Object,SomeState> statefulDStream = pairDStream.updateStateByKey(...);
JavaRDD<SomeState> updatedStateRDD = statefulDStream.map(...);

5)基于传入流

中的值对更新状态运行用户驱动的查询
incomingStream.foreachRDD(new Function<JavaRDD<SomeState>,Void>() {
            @Override
            public Void call(JavaRDD<SomeState> events) throws Exception { 
                updatedStateRDD.count();
                SQLContext sqx = new SQLContext(events.context());
                schemaDf = sqx.createDataFrame(updatedStateRDD, SomeState.class);
                schemaDf.registerTempTable("TEMP_TABLE");
                sqx.sql(SELECT col1 from TEMP_TABLE where <condition1> and <condition2> ...);
                //collect the results and process and send alerts
                ...
            }
);

第一步应该是确定哪一步花费的时间最多。请查看Spark Master UI并确定哪个步骤/阶段花费了大部分时间。

有几个最佳实践+我的观察,你可以考虑:-

  1. 使用Singleton SQLContext -参见示例- https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/SqlNetworkWordCount.scala
  2. updateStateByKey在大量键的情况下可能是内存密集型操作。您需要检查所处理的数据大小updateStateByKey函数以及它是否适合给定的记忆。
  3. 你的GC表现如何?
  4. 你真的在用"initialRDD"吗?如果没有,那么不要加载它。如果是静态数据集,则缓存它。
  5. 检查SQL查询所花费的时间。

这里有一些问题/领域可以帮助你

  1. DStreams的存储级别是什么?
  2. 集群大小和集群配置
  3. 版本的Spark?

最后- ForEachRDD是一个输出操作,它在驱动程序上执行给定的函数,但RDD可能会执行操作,这些操作在工作节点上执行。

您可能需要阅读这个更好地解释输出操作- http://spark.apache.org/docs/latest/streaming-programming-guide.html#output-operations-on-dstreams

我也面临同样的问题,你能让我知道你是否有同样的解决方案吗?虽然我在下面的帖子中提到了详细的用例。

Spark SQL +窗口+流问题- Spark SQL查询在使用Spark流运行时执行时间长

相关内容

  • 没有找到相关文章

最新更新