我正在使用带有Spark 1.2.1的cassandra 2.1.5(.469)。
我在大 C* 表(2,034,065,959 行)上使用 Spark 执行了迁移作业 - 将其迁移到另一个架构表 (new_table),使用:
some_mapped_rdd.saveToCassandra("keyspace", "new_table", writeConf=WriteConf(parallelismLevel = 50))
我可以在OpsCenter/Activity中看到C*在new_table上做一些压缩任务,并且持续了几天。
此外,我正在尝试运行另一个作业,而压缩任务仍在进行中,使用:
//join with cassandra
val rdd = some_array.map(x => SomeClass(x._1,x._2)).joinWithCassandraTable(keyspace, some_table)
//get only the jsons and create rdd temp table
val jsons = rdd.map(_._2.getString("this"))
val jsonSchemaRDD = sqlContext.jsonRDD(jsons)
jsonSchemaRDD.registerTempTable("this_json")
而且它比平时需要更长的时间(通常我不执行巨大的迁移任务)才能完成。
那么 C* 中的压缩过程会影响 Spark 作业吗?
编辑:
我的表配置为 SizeTieredCompactionStrategy(默认)压缩策略,我有 2882~ 的 20M~(和更小,在 1 个节点中的 3 个节点上)SSTable 文件,所以我想我应该将 compaction_throughput_mb_per_sec 参数更改为更高的值并采用 DateTieredCompactionStrategy 压缩策略,因为我的数据是时间序列数据。
就可能使用大量系统资源的压缩而言,从性能的角度来看,它可能会影响您的 Spark 作业。 您可以通过compaction_throughput_mb_per_sec控制一次可以执行多少吞吐量压缩。
另一方面,降低压实吞吐量会使压实需要更长的时间才能完成。
此外,正在发生压缩的事实可能意味着数据在 sstable 之间的分布方式不是最佳的。 因此,压缩可能是问题的症状,但不是实际问题。 事实上,它可能是你问题的解决方案(随着时间的推移,它会取得更多进展)。
我建议查看您正在查询的表的 cfhistogram 输出,以查看每次读取命中了多少 SSTable。 这可能是一个很好的指标,表明某些东西不是最佳的 - 例如需要更改您的配置(即可记忆的冲洗率)或优化或更改您的压实策略。
这个答案很好地解释了如何读取 cfhistogram 输出。