如何使用 SparkSession in Dataframe 使用 Spark-Cassandra-connector



我正在使用pysparkspark-cassandra-connector_2.11-2.3.0.jar与cassandra DB。 我正在从一个密钥空间读取数据帧并写入另一个不同的密钥空间。这两个密钥空间具有不同的用户名和密码。

我使用以下方法创建了 sparkSession:

spark_session = None
def set_up_spark(sparkconf,config):
"""
sets up spark configuration and create a session
:return: None
"""
try:
logger.info("spark conf set up Started")
global spark_session
spark_conf = SparkConf()
for key, val in sparkconf.items():
spark_conf.set(key, val)
spark_session = SparkSession.builder.config(conf=spark_conf).getOrCreate()
logger.info("spark conf set up Completed")
except Exception as e:
raise e

我使用此 SparkSession 将数据作为数据帧读取为:

table_df = spark_session.read 
.format("org.apache.spark.sql.cassandra") 
.options(table=table_name, keyspace=keyspace_name) 
.load()

我能够使用上述会话读取数据。 spark_session附加到上述查询。

现在我需要创建另一个会话,因为写入表的凭据不同。 我的写入查询为:

table_df.write 
.format("org.apache.spark.sql.cassandra") 
.options(table=table_name, keyspace=keyspace_name) 
.mode("append") 
.save()

我找不到如何在 cassandra 中为上述写入操作附加新的 sparkSession。

如何使用 spark-cassandra-connector 在 pyspark 中附加新的 SparkSession 以进行写入操作?

您可以简单地将该信息作为选项传递给特定readwrite操作,这包括以下内容:spark.cassandra.connection.host

请注意,您需要将这些选项放入字典中,并传递此字典,而不是直接传递,如文档中所述。

read_options = { "table": "..", "keyspace": "..", 
"spark.cassandra.connection.host": "IP1", 
"spark.cassandra.auth.username": "username1", 
"spark.cassandra.auth.password":"password1"}
table_df = spark_session.read 
.format("org.apache.spark.sql.cassandra") 
.options(**read_options) 
.load()
write_options = { "table": "..", "keyspace": "..", 
"spark.cassandra.connection.host": "IP2", 
"spark.cassandra.auth.username": "username2", 
"spark.cassandra.auth.password":"password1"}
table_df.write 
.format("org.apache.spark.sql.cassandra") 
.options(**write_options) 
.mode("append") 
.save()

最新更新