这是正确的-Spark Cassandra连接器只能用作流接收器,而不能用作流源。
数据源org.apache.spark.sql.cassandra不支持流式读取
val spark = SparkSession
.builder()
.appName("SparkCassandraApp")
.config("spark.cassandra.connection.host", "localhost")
.config("spark.cassandra.connection.port", "9042")
.config("spark.cassandra.auth.username", "xxxxx")
.config("spark.cassandra.auth.password", "yyyyy")
.master("local[*]")
.getOrCreate();
val tableDf3 = spark.**readStream**
.format("org.apache.spark.sql.cassandra")
.options(Map( "table" -> "aaaaa", "keyspace" -> "bbbbb"))
.load()
.filter("deviceid='XYZ'")
tableDf3.show(10)
如果你想从Cassandra获得更改,那么这是一项相当复杂的任务,取决于Cassandra的版本(它是否实现CDC(和其他因素。
对于Spark,您可以通过定期重新读取数据来实现某种流式传输,使用时间戳列过滤掉您已经读取的数据。您可以在以下答案中找到有关该方法的更多信息。