我正在使用Spark Structured Streaming实时用于机器学习目的,我想将预测存储在我的Cassandra集群中。
由于我处于流上下文中,每秒多次执行相同的请求,因此一个强制性的优化是使用 PreparedStatement。
在Cassandra Spark驱动程序(https://github.com/datastax/spark-cassandra-connector(中,没有办法使用PreparedStatement(在scala或python中,我不考虑使用java作为选项(
我应该使用 scala (https://github.com/outworkers/phantom(/python (https://github.com/datastax/python-driver( cassandra 驱动程序吗?那么它是如何工作的,我的连接对象需要可序列化才能传递给worker?
如果有人可以帮助我!
谢谢:)
为了执行准备好的语句,然后在使用结构化 Spark 流处理流时在 Cassandra 中注册数据,您需要:
- import com.datastax.driver.core.Session
- import com.datastax.spark.connector.cql.CassandraConnector
然后,构建连接器:
val connector = CassandraConnector.apply(sparkSession.sparkContext.getConf)
同时拥有会话和连接器,您现在可以调用您在语句 scala 类中编写的准备好的语句函数
connector.withSessionDo { session =>
Statements.PreparedStatement()
}
最后,您可以使用下面的函数在 Cassandra 中写入数据,cql 是将变量绑定到准备好的语句并执行它的函数:
private def processRow(value: Commons.UserEvent) = {
connector.withSessionDo { session =>
session.execute(Statements.cql(value.device_id, value.category, value.window_time, value.m1_sum_downstream, value.m2_sum_downstream))
}
}
当然,你必须在foreach编写器中调用这个函数(processRow(。
// This Foreach sink writer writes the output to cassandra.
import org.apache.spark.sql.ForeachWriter
val writer = new ForeachWriter[Commons.UserEvent] {
override def open(partitionId: Long, version: Long) = true
override def process(value: Commons.UserEvent) = {
processRow(value)
}
override def close(errorOrNull: Throwable) = {}
}
val query =
ds.writeStream.queryName("aggregateStructuredStream").outputMode("complete").foreach(writer).start