如何并行化RDD工作时使用cassandra火花连接器的数据加



这是示例senario,我们在cassandra中有实时数据记录,我们希望聚合不同时间范围的数据。我写的代码如下:

 val timeRanges = getTimeRanges(report)
 timeRanges.foreach { timeRange =>
          val (timestampStart, timestampEnd) = timeRange
          val query = _sc.get.cassandraTable(report.keyspace, utilities.Helper.makeStringValid(report.scope)).
            where(s"TIMESTAMP > ?", timestampStart).
            where(s"VALID_TIMESTAMP <= ?", timestampEnd)
        ......do the aggregation work....

代码的问题是,对于每个时间范围,聚合工作都不是并行运行的。我的问题是如何将聚合工作并行化?既然RDD不能在另一个RDD或Future中运行?有没有办法将工作并行化,或者我们不能在这里使用火花连接器?

使用joinWithCassandraTable函数。这允许您使用来自一个RDD的数据来访问C*并提取记录,就像您的示例中一样。

https://github.com/datastax/spark-cassandra-connector/blob/master/doc/2_loading.md#performing-自12年以来的效率-与分析表

joinWithCassandraTable利用java驱动程序执行单个查询源RDD所需的每个分区,因此不需要将请求或序列化数据。这意味着任何RDD和Cassandra表可以在不执行完整表的情况下执行扫描当在共享相同的分区键,这将不需要在机器。在所有情况下,此方法都将使用源RDD用于数据局部性的分区和放置。

最后,我们使用并集来连接每个RDD,并使它们并行化。

相关内容

  • 没有找到相关文章

最新更新