当我们使用 spark 从 csv for DB 读取数据如下时,它会自动将数据拆分为多个分区并发送给执行器
spark
.read
.option("delimiter", ",")
.option("header", "true")
.option("mergeSchema", "true")
.option("codec", properties.getProperty("sparkCodeC"))
.format(properties.getProperty("fileFormat"))
.load(inputFile)
目前,我有一个ID列表:
[1,2,3,4,5,6,7,8,9,...1000]
我想做的是将此列表拆分为多个分区并发送给执行器,在每个执行器中,运行 sql 作为
ids.foreach(id => {
select * from table where id = id
})
当我们从 cassandra 加载数据时,连接器将生成查询 sql:
select columns from table where Token(k) >= ? and Token(k) <= ?
这意味着,连接器将扫描整个数据库,实际上,我不需要扫描整个表,我只需要从 id 列表中的 k(分区键(表中获取所有数据。
表架构为:
CREATE TABLE IF NOT EXISTS tab.events (
k int,
o text,
event text
PRIMARY KEY (k,o)
);
或者如何使用 Spark 使用预定义的 SQL 语句从 Cassandra 加载数据而不扫描整个表?
您只需要使用joinWithCassandra
函数来执行操作所需的数据选择。 但请注意,此功能只能通过RDD API使用。
像这样:
val joinWithRDD = your_df.rdd.joinWithCassandraTable("tab","events")
您需要确保数据帧中的列名与 Cassandra 中的分区键名称匹配 - 有关详细信息,请参阅文档。
DataFrame 实现仅在 DSE 版本的 Spark Cassandra Connector 中可用,如以下博客文章中所述。
2020 年 9 月更新:Spark Cassandra 连接器 2.5.0 中添加了对加入 Cassandra 的支持