Spark Cassandra,如何基于查询获取数据



我有一个Cassandra表,它非常大,现在我用下面的代码建立了Cassandra连接。

import pandas as pd
import numpy as np
from pyspark import *
import os
from pyspark.sql import SQLContext

os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages  com.datastax.spark:spark-cassandra-connector_2.12:3.0.1 --conf spark.cassandra.connection.host=127.0.0.1 pyspark-shell'
conf = SparkConf().set("spark.cassandra.connection.host", "127.0.0.1").set("spark.cassandra.connection.port", "9042").setAppName("Sentinel").setMaster("spark://Local:7077")
sc = SparkContext(conf=conf)
sqlContext = SQLContext(sc)
table_df = sqlContext.read
.format("org.apache.spark.sql.cassandra")
.options(table='movies', keyspace='movie_lens')
.load()

主键是Movie_id,它是一个整数。load((将整个表加载到内存中,我想避免这种情况。我得到的一种方法是使用过滤器

table_df = sqlContext.read
.format("org.apache.spark.sql.cassandra")
.options(table='movies', keyspace='movie_lens')
.load()
.filter("movie_id = 37032")

但过滤器实际上阻止了将整个表加载到内存中吗?或者先加载然后过滤。此外,我还要查询许多身份证。比方说,我需要1000个身份证,而身份证每天都在变化。那该怎么做呢?

是的,Spark Cassandra连接器将执行所谓的"谓词下推";如果您在分区键上执行查询,并且只加载特定查询中的数据(.load函数只加载元数据,实际的数据加载将在您真正需要数据来执行操作时第一次发生(。关于Spark Cassandra连接器中何时发生谓词下推,有详细的规则记录。您也可以通过运行table_df.explain()来检查这一点,并查找标有星号*的过滤器的PushedFilters部分。

如果需要查找多个ID,则可以使用.isin过滤器,但不建议使用Cassandra。最好创建一个带有ID的数据帧,并使用Cassandra数据帧执行所谓的Direct Join(自SCC 2.5起,数据帧可用,RDD可用(。我有一篇关于加入Cassandra 数据的长篇博客文章

最新更新