我目前正在使用这个堆栈:
- Cassandra 2.2(多节点(
- 火花/流1.4.1
- Spark Cassandra连接器1.4.0-M3
我有这个DStream[Ids],其中RDD的元素数约为6000-7000。id
是分区密钥。
val ids: DStream[Ids] = ...
ids.joinWithCassandraTable(keyspace, tableName, joinColumns = SomeColumns("id"))
随着tableName
变得越来越大,比如说大约30k个"行",查询需要更长的时间,并且我很难保持在批处理持续时间阈值以下。它的性能类似于使用大量的IN
-子句,我已经理解这是不可取的。
有没有更有效的方法可以做到这一点?
答案:在使用Cassandra进行连接之前,请始终记住使用repartitionByCassandraReplica
重新分区本地RDD,以确保每个分区仅针对本地Cassandra节点工作。在我的情况下,我还必须在连接的本地RDD/DStream上增加分区,以便任务在工作人员之间均匀分布。
"id"是表中的分区键吗?如果没有,我认为这是必要的,否则你可能正在进行表格扫描,随着表格越来越大,扫描速度会越来越慢。
此外,为了使用此方法获得良好的性能,我认为您需要在ids RDD上使用repartitionByCassandraReplica((操作,以便连接是每个节点上的本地操作。
看看这个。