我有一个Cassandra表,其中复合分区键为(time_bucket时间戳,节点整数)。 time_bucket值是插入数据的时间,秒数转换为 00,节点值范围为 0 到 100
火花作业每分钟运行一次,从表中获取数据。该表包含近 2500 万条记录,每分钟添加一次记录。
如果我的 Spark 作业每次运行时都选择所有记录,则作业将在 2 分钟内完成。但是如果我使用:
的c.cassandraTable(keyspace_name,table_name).where("time_bucket = ? ", from).where("nodeid_bucket IN ? ", nodeid_bucket_range)
其中 val nodeid_bucket_range = 0 到 100,
作业需要 10 分钟才能完成。
我的群集有 6 个节点,正在使用 DSE 4.8.9。每个执行程序使用 8 个内核和 20GB 内存。增加这些值无助于加快火花作业。
知道为什么我的工作需要 10 分钟吗?使用 IN 子句时,spark-cassandra 不能很好地工作吗?
你可能想要joinWithCassandraTable
.几乎总是,如果有大量值,则最好通过执行联接来提供In
子句。这将在不同的执行器上并行执行您的所有请求。
https://github.com/datastax/spark-cassandra-connector/blob/master/doc/2_loading.md#using-joinwithcassandratable
IN
语句翻译成key OR key OR key3 ... OR key100
,这对于优化器做出有用的东西来说是非常低效的。在您的情况下,您可以使用:
sc.cassandraTable(keyspace_name,table_name).where("time_bucket = ? ", from).where("nodeid_bucket > ? AND nodeid_bucket < ? ", nodeid_bucket_range)
观察范围的边缘,当然,这假设您的范围是连续的。