我有两个cassandra集群。如何在同一SparkSession中设置主机、密码和用户?我该如何将它们与CassandraConnect配合使用?
我试过这个:
val cassandraCon: CassandraConnector = CassandraConnector(conf)
val ks = "monitore"
val ttableName = "validate_structure"
def getIndex(): ResultSet = {
val table = ks + "." + ttableName
val query = s"""select *
|from ${table}""".stripMargin
println(query)
cassandraCon.withSessionDo(s => {
s.execute(query)
})
}
然而,问题是,只有当cassandra集群与spark在同一主机上时,这才有效。我还试图创建一个目录,但我找不到用session.execute而不是spark.sql.发出请求的方法
有人能帮我吗?我使用Spark Structured Streaming 3.1.2,并使用cassandra连接来丰富我的数据。
第一个问题是.withSessionDo
只在驱动程序的上下文中运行,而不是在执行器的上下文中,因此它不会被分发。
您需要使用:
- RDD API中的
.joinWithCassandraTable
函数(还有它的左联接版本( - 或者使用所谓的DirectJoin(详见作者的博客文章(,当Spark Cassandra Connector(SCC(检测到连接的一侧在Cassandra中,并将其转换为对单个分区的查询时。不幸的是,Spark 3.1破坏了SCC中DirectJoin的当前版本(请参阅JIRA(,因此您可能需要使用RDD API直到修复为止
我有一篇关于如何在Cassandra中使用数据执行高效联接的详细博客文章。
关于这两个集群,完全可以为单独的读/写操作指定连接细节,只需指定.option("spark.cassandra.connection.host", "host-or-ip")
即可。(SCC的主要开发人员Russell Spitzer在博客上发布了关于如何连接到多个集群的文章(。当您使用Catalog API时,也可以这样做——只需将连接属性名称(如spark.cassandra.connection.host
(附加到特定的目录名称(请参阅文档(