为什么 cassandra 在执行查询时使用 "allow filtering" 计数查询,而在我的代码中没有提及它?



我正在使用spark-sql-2.4.1,spark-cassandra-connector_2.11-2.4.1和java8。

我正在执行以下简单查询以获取C *表行数。

JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
long recCount = javaFunctions(sc).cassandraTable(keyspace, columnFamilyName).cassandraCount();

但它因以下错误而超时。

java.io.IOException: Exception during execution of SELECT count(*) FROM "radata"."model_vals" WHERE token("model_id", "type", "value", "code") > ? AND token("model_id", "type", "value", "code") <= ?   ALLOW FILTERING: Cassandra timeout during read query at consistency LOCAL_ONE (1 responses were required but only 0 replica responded)
at com.datastax.spark.connector.rdd.CassandraTableScanRDD.com$datastax$spark$connector$rdd$CassandraTableScanRDD$$fetchTokenRange(CassandraTableScanRDD.scala:350)
at com.datastax.spark.connector.rdd.CassandraTableScanRDD$$anonfun$17.apply(CassandraTableScanRDD.scala:367)
Caused by: com.datastax.driver.core.exceptions.ReadTimeoutException: Cassandra timeout during read query at consistency LOCAL_ONE (1 responses were required but only 0 replica responded)

我正在使用具有以下设置的Cassandra 6节点集群:

cassandra.output.consistency.level=ANY
cassandra.concurrent.writes=1500
cassandra.output.batch.size.bytes=2056
cassandra.output.batch.grouping.key=partition 
cassandra.output.batch.grouping.buffer.size=3000
cassandra.output.throughput_mb_per_sec=128
cassandra.connection.keep_alive_ms=30000
cassandra.read.timeout_ms=600000

1(为什么在解释之前附加"允许过滤" 实际执行它 ?

2(甚至认为我设置了"cassandra.output.consistency.level=ANY"为什么是 使用"一致性LOCAL_ONE"执行?

如何解决这些问题?

  1. ALLOW FILTER由Spark-Cassandra连接器隐式添加到生成的CQL查询中。

  2. 一致性级别是按查询设置的,并在 Spark 端设置。您可以使用

spark.cassandra.input.consistency.level=ANY

火花配置上。

但是改变一致性级别对你没有帮助,因为你只有一个 Cassandra 节点没有响应。我建议你的表非常大,花费 Cassandra 计算计数的时间比任何超时参数都多。可以在客户端上为每个查询设置此参数。在您的情况下,您可以查看Spark-Cassandra连接器配置:https://github.com/datastax/spark-cassandra-connector/blob/master/doc/reference.md

另一种方法是在 Spark 端计算计数并输入 .count(( 而不是 .cassandraCount((。根据我的经验,我建议避免在生产中在Cassandra方面进行任何聚合。特别是,当你使用Spark - 为此类任务设计的框架时。

最新更新