如何在我的spark结构化流中获得两个不同的cassandra集群



我有两个cassandra集群。如何在同一SparkSession中设置主机、密码和用户?我该如何将它们与CassandraConnect配合使用?

我试过这个:

val cassandraCon: CassandraConnector = CassandraConnector(conf)
val ks = "monitore"
val ttableName = "validate_structure"
def getIndex(): ResultSet = {
val table = ks + "." + ttableName
val query = s"""select *
|from ${table}""".stripMargin
println(query)
cassandraCon.withSessionDo(s => {
s.execute(query)
})
}

然而,问题是,只有当cassandra集群与spark在同一主机上时,这才有效。我还试图创建一个目录,但我找不到用session.execute而不是spark.sql.发出请求的方法

有人能帮我吗?我使用Spark Structured Streaming 3.1.2,并使用cassandra连接来丰富我的数据。

第一个问题是.withSessionDo只在驱动程序的上下文中运行,而不是在执行器的上下文中,因此它不会被分发。

您需要使用:

  • RDD API中的.joinWithCassandraTable函数(还有它的左联接版本(
  • 或者使用所谓的DirectJoin(详见作者的博客文章(,当Spark Cassandra Connector(SCC(检测到连接的一侧在Cassandra中,并将其转换为对单个分区的查询时。不幸的是,Spark 3.1破坏了SCC中DirectJoin的当前版本(请参阅JIRA(,因此您可能需要使用RDD API直到修复为止

我有一篇关于如何在Cassandra中使用数据执行高效联接的详细博客文章。

关于这两个集群,完全可以为单独的读/写操作指定连接细节,只需指定.option("spark.cassandra.connection.host", "host-or-ip")即可。(SCC的主要开发人员Russell Spitzer在博客上发布了关于如何连接到多个集群的文章(。当您使用Catalog API时,也可以这样做——只需将连接属性名称(如spark.cassandra.connection.host(附加到特定的目录名称(请参阅文档(

最新更新