使用python cassandra driver查询Databricks中的cassandra表



我试图优化一种在数据库块中工作时查询cassadnra表的方法。在阅读了这篇文章https://medium.com/@yoke_techworks/cassandra-and-pyspark-5d7830512f19后,作者建议查询cassandra表,每次一行,并将每个结果合并。

我的尝试,使用python cassandra驱动程序,是:

from cassandra.cluster import Cluster
from cassandra.auth import PlainTextAuthProvider
import pandas as pd
def init_cassandra_session(endpoints, keyspace, username, password, port=9042):

auth_provider = PlainTextAuthProvider(username, password)
cluster = Cluster(endpoints, port=port, auth_provider=auth_provider)
cassandra_session = cluster.connect(keyspace, wait_for_all_pools=False)
return cassandra_session
def get_rdd_values(rows):
out_df = None
cassandra_session = init_cassandra_session(host, keyspace, username, password)
for row in rows:
device_id = row.__getitem__('device_id')
timestamp = row.__getitem__('timestamp')
category = row.__getitem__('category')
query = '''
select * from headcounter_category_h_aggr where device_id = '%s' and timestamp = '&s' and category = '%s'
'''
result_query = cassandra_session.execute(query, [device_id, timestamp, category])
if out_df is None:
out_df = result_query
else:
out_df = out_df.append(result_query)

return out_df 
columns = ['device_id', 'timestamp', 'category']
data = [['SIMUL_TEST03', '2020-12-23 11:00:00', 'PERSON'], ['SIMUL_TEST03', '2020-12-23 12:00:00', 'PERSON']]
pdf = pd.DataFrame(data, columns=columns)
dfFromData1 = spark.createDataFrame(pdf)
rdd_values = dfFromData1.rdd.mapPartitions(get_rdd_values)
rdd_values.collect()

当尝试收集结果时,rdd_values似乎是NoneType,所以它不是可迭代的。

我找不到我犯的错误。

编辑我解决了这个问题:我改变了get_rdd_values()函数如下:

def get_rdd_values(rows):
out_df = []
cassandra_session = init_cassandra_session(host, keyspace, username, password)
for row in rows:
device_id = row.__getitem__('device_id')
timestamp = row.__getitem__('timestamp')
category = row.__getitem__('category')
query = f"select * from headcounter_category_h_aggr where device_id = '{device_id}' and timestamp = '{timestamp}' and category = '{category}'"
result_query = cassandra_session.execute(query)
if len(out_df)== 0:
out_df = result_query
else:
out_df = out_df.append(result_query)

return out_df

但现在它似乎使两个时间相同的查询或至少out_df是由两个相同的元素

EDIT 2 andsolution:

经过一些尝试后,我发现直接从RDD中创建一个spark数据框可以删除重复的行。下面的代码是:

dfFromRDD = spark.createDataFrame(rdd_values, schema = schema)

你不应该这样做——相反,你需要使用Spark Cassandra Connector,它通过DataFrame api (PySpark的文档)从Spark提供对Cassandra的本地访问。你只需要安装一个与你的Databricks运行时相匹配的版本(在Databricks上,由于这里描述的原因,你需要使用assembly版本),然后你就可以很容易地查询Cassandra,像这样:

df = spark.read
.format("org.apache.spark.sql.cassandra")
.options(table="table_name", keyspace="ks_name")
.load()

或者像这样与Spark目录集成:

spark.conf.set("spark.sql.catalog.myCatalog", 
"com.datastax.spark.connector.datasource.CassandraCatalog")
df = spark.read.table("myCatalog.myKs.myTab")

Spark Cassandra Connector将在可能的情况下执行谓词下推(例如,当您通过分区键查询时)。

如果你需要连接你的数据集和Cassandra表,那么你可以按照下面的博客文章中概述的所谓的直接连接的使用说明。

相关内容

  • 没有找到相关文章

最新更新