Spark (SQL / Structured Streaming) Cassandra - PreparedState



我正在使用Spark Structured Streaming实时用于机器学习目的,我想将预测存储在我的Cassandra集群中。

由于我处于流上下文中,每秒多次执行相同的请求,因此一个强制性的优化是使用 PreparedStatement。

在Cassandra Spark驱动程序(https://github.com/datastax/spark-cassandra-connector(中,没有办法使用PreparedStatement(在scala或python中,我不考虑使用java作为选项(

我应该使用 scala (https://github.com/outworkers/phantom(/python (https://github.com/datastax/python-driver( cassandra 驱动程序吗?那么它是如何工作的,我的连接对象需要可序列化才能传递给worker?

如果有人可以帮助我!

谢谢:)

为了执行准备好的语句,然后在使用结构化 Spark 流处理流时在 Cassandra 中注册数据,您需要:

  • import com.datastax.driver.core.Session
  • import com.datastax.spark.connector.cql.CassandraConnector

然后,构建连接器:

 val connector = CassandraConnector.apply(sparkSession.sparkContext.getConf) 

同时拥有会话连接器,您现在可以调用您在语句 scala 类中编写的准备好的语句函数

 connector.withSessionDo { session =>
 Statements.PreparedStatement()

}

最后,您可以使用下面的函数在 Cassandra 中写入数据,cql 是将变量绑定到准备好的语句并执行它的函数:

  private def processRow(value: Commons.UserEvent) = {
  connector.withSessionDo { session =>
  session.execute(Statements.cql(value.device_id, value.category, value.window_time, value.m1_sum_downstream, value.m2_sum_downstream))
}

}

当然,你必须在foreach编写器中调用这个函数(processRow(。

     // This Foreach sink writer writes the output to cassandra.
import org.apache.spark.sql.ForeachWriter
val writer = new ForeachWriter[Commons.UserEvent] {
  override def open(partitionId: Long, version: Long) = true
  override def process(value: Commons.UserEvent) = {
    processRow(value)
  }
  override def close(errorOrNull: Throwable) = {}
}
val query =
  ds.writeStream.queryName("aggregateStructuredStream").outputMode("complete").foreach(writer).start

相关内容

  • 没有找到相关文章

最新更新