Spark SQL +窗口+流问题- Spark SQL查询在使用Spark流运行时执行时间长



我们期待使用Spark Streaming (with flume)和Spark SQL with windowing实现一个用例,允许我们对一组数据执行CEP计算。(请参见下文了解如何捕获和使用数据)。这个想法是使用SQL来执行一些符合特定条件的操作。基于每个传入事件批执行查询似乎非常慢(因为它在进行中)。

这里慢的意思是说我配置了600秒的窗口大小和20秒的批处理间隔(以每2秒1次输入的速度泵送数据),所以说在10分钟后,输入将是恒定的,它应该花费相同的时间来执行SQL查询。

但是在时间过去之后,它开始花费更多的时间并逐渐增加,所以对于大约300条记录,select count(*)查询最初花费1秒,后来在15分钟后开始花费2到3秒并逐渐增加。

如果有人能提出更好的方法来实现这个用例,我将不胜感激。下面给出的是我们执行的步骤,以实现这一目标-
    //Creating spark and streaming context
    JavaSparkContext sc = new JavaSparkContext(sparkConf);
    JavaStreamingContext ssc = new JavaStreamingContext(sc, 20);
    JavaReceiverInputDStream<SparkFlumeEvent> flumeStream; = FlumeUtils.createStream(ssc, "localhost", 55555);
    //Adding the events on window
    JavaDStream<SparkFlumeEvent> windowDStream =
        flumeStream.window(WINDOW_LENGTH, SLIDE_INTERVAL);
    // sc is an existing JavaSparkContext.
    SQLContext sqlContext = new org.apache.spark.sql.SQLContext(sc);
    windowDStream.foreachRDD(new Function<JavaRDD<SparkFlumeEvent>, Void>()
    {
        public Void call(JavaRDD<SparkFlumeEvent> eventsData)
        throws Exception
        {
            long t2 = System.currentTimeMillis();
            lTempTime = System.currentTimeMillis();
            JavaRDD<AVEventPInt> inputRDD1 = eventsData.map(new Function<SparkFlumeEvent, AVEventPInt>()
            {
                @Override
                public AVEventPInt call(SparkFlumeEvent eventsData) throws Exception
                {
                ...
                    return avevent;
                }
            });
            DataFrame schemaevents = sqlContext.createDataFrame(inputRDD1, AVEventPInt.class);
            schemaevents.registerTempTable("avevents" + lTempTime);
            sqlContext.cacheTable("avevents" + lTempTime);
            // here the time taken by query is increasing gradually
            long t4 = System.currentTimeMillis();
            Long lTotalEvent = sqlContext.sql("SELECT count(*) FROM avevents" + lTempTime).first().getLong(0);
            System.out.println("time for total event count: " + (System.currentTimeMillis() - t4) / 1000L + " seconds n");
            sqlContext.dropTempTable("avevents"  + lTempTime);
            sqlContext.clearCache();
            return null;
        }
    });

例如,假设我们想要根据日志级别确定一段时间内的事件计数。在SQL中,我们会发出如下形式的查询:

SELECT level, COUNT(1) from ambari GROUP BY level

但是使用Scala数据框架API,你可以发出以下查询:

ambari.groupBy("level").count()
在这一点上,一些非常接近本地SQL的东西可以用于查询,如:
sqlContext.sql("SELECT level, COUNT(1) from ambari group by level")

返回与DataFrame API中返回的相同的数据结构。返回的数据结构本身就是一个数据帧。

此时,没有发生任何执行:数据帧上的操作被映射到RDD上的适当操作(在本例中为 )。
RDD.groupBy(...).aggregateByKey(...))

我们可以通过对结果执行collect()来强制执行,将执行结果存入驱动程序内存。

相关内容

  • 没有找到相关文章

最新更新