如何将列表拆分为多个分区并发送给执行程序



当我们使用 spark 从 csv for DB 读取数据如下时,它会自动将数据拆分为多个分区并发送给执行器

spark
  .read
  .option("delimiter", ",")
  .option("header", "true")
  .option("mergeSchema", "true")
  .option("codec", properties.getProperty("sparkCodeC"))
  .format(properties.getProperty("fileFormat"))
  .load(inputFile)

目前,我有一个ID列表:

[1,2,3,4,5,6,7,8,9,...1000]

我想做的是将此列表拆分为多个分区并发送给执行器,在每个执行器中,运行 sql 作为

ids.foreach(id => {    
select * from table where id = id
})

当我们从 cassandra 加载数据时,连接器将生成查询 sql:

select columns from table where Token(k) >= ? and Token(k) <= ? 
这意味着,连接器将扫描整个数据库,实际上,我

不需要扫描整个表,我只需要从 id 列表中的 k(分区键(表中获取所有数据。

表架构为:

CREATE TABLE IF NOT EXISTS tab.events (
    k int,
    o text,
    event text
    PRIMARY KEY (k,o)
);

或者如何使用 Spark 使用预定义的 SQL 语句从 Cassandra 加载数据而不扫描整个表?

您只需要使用joinWithCassandra函数来执行操作所需的数据选择。 但请注意,此功能只能通过RDD API使用。

像这样:

val joinWithRDD = your_df.rdd.joinWithCassandraTable("tab","events")

您需要确保数据帧中的列名与 Cassandra 中的分区键名称匹配 - 有关详细信息,请参阅文档。

DataFrame 实现仅在 DSE 版本的 Spark Cassandra Connector 中可用,如以下博客文章中所述。

2020 年 9 月更新:Spark Cassandra 连接器 2.5.0 中添加了对加入 Cassandra 的支持

相关内容

  • 没有找到相关文章

最新更新