如何使用pyspark执行CQL查询



我想用PySpark执行Cassandra CQL查询。但我找不到执行它的方法。我可以将整个表加载到数据帧中,创建Tempview并查询它。

df = spark.read.format("org.apache.spark.sql.cassandra").
options(table="country_production2",keyspace="country").load()
df.createOrReplaceTempView("Test")

请建议任何更好的方法,以便我可以在PySpark中执行CQL查询。

Spark SQL不直接支持Cassandra的cql方言。它只允许您将表加载为数据帧并对其进行操作。

如果您关心读取整个表来查询它,那么您可以使用下面给出的过滤器,让Spark推送谓词,只加载您需要的数据。

from pyspark.sql.functions import *
df = spark.read
.format("org.apache.spark.sql.cassandra")
.options(table=table_name, keyspace=keys_space_name)
.load()
.filter(col("id")=="A")
df.createOrReplaceTempView("Test")

在pyspark中,您使用的是SQL,而不是CQL。如果SQL查询在某种程度上与CQL匹配,即,您按分区或主键进行查询,那么Spark Cassandra Connector(SCC(将把查询转换为CQL,并执行(所谓的谓词下推(。如果不匹配,Spark将通过SCC加载所有数据,并在Spark级别进行过滤。

因此,在您注册了临时视图后,您可以执行以下操作:

val result = spark.sql("select ... from Test where ...")

并在CCD_ 3变量中处理结果。要检查谓词是否发生下推,请执行result.explain(),并在PushedFilters部分的条件中检查*标记。

相关内容

  • 没有找到相关文章

最新更新