Apache Spark + cassandra+Java + Spark 会话过滤器记录基于给定的 from 和 to



我正在做一个Spring Java项目,并使用Datastax连接器集成Apache Spark和cassandra。

我已经自动连接了火花会话,下面的代码行似乎有效。

Map<String, String> configMap = new HashMap<>();
configMap.put("keyspace", "key1");
configMap.put("table", tableName.toLowerCase());
Dataset<Row> ds = sparkSession.sqlContext().read().format("org.apache.spark.sql.cassandra").options(configMap)
.load();
ds.show();

在上面的步骤中,我正在加载数据集,在下面的步骤中,我正在对日期时间字段进行过滤。

String s1 = "2020-06-23 18:51:41";
String s2 = "2020-06-23 18:52:21";
Timestamp from = Timestamp.valueOf(s1);
Timestamp to = Timestamp.valueOf(s2);
ds = ds.filter(df.col("datetime").between(from, to));

是否可以在加载本身期间应用此过滤条件。如果是这样,有人可以建议我如何做到这一点吗?

提前谢谢。

您不必在此处显式执行任何操作,spark-cassandra-connector 具有谓词下推,因此您的过滤条件将在数据选择期间应用。

来源: https://github.com/datastax/spark-cassandra-connector/blob/master/doc/14_data_frames.md

连接器将自动将所有有效谓词下推到 Cassandra。数据源还将自动仅从 Cassandra 中选择完成查询所需的列。这可以使用explain命令进行监视。

仅当您要对其进行筛选的列是第一个聚类分析列时,才会有效地下推此筛选器。 正如 Rayan 所指出的,我们可以在数据集上使用explain命令来检查谓词下推是否发生 - 相应的谓词附近应该有*个字符,如下所示:

val dcf3 = dc.filter("event_time >= cast('2019-03-10T14:41:34.373+0000' as timestamp) 
AND event_time <= cast('2019-03-10T19:01:56.316+0000' as timestamp)")
// dcf3.explain
// == Physical Plan ==
// *Scan org.apache.spark.sql.cassandra.CassandraSourceRelation [uuid#21,event_time#22,id#23L,value#24] 
// PushedFilters: [ *GreaterThanOrEqual(event_time,2019-03-10 14:41:34.373), *LessThanOrE..., 
// ReadSchema: struct<uuid:string,event_time:timestamp,id:bigint,value...

如果不推送谓词,当在 Spark 级别进行筛选时,我们将在扫描后看到一个额外的步骤。

最新更新