我了解到使用RichSinkFunction
相对于直接调用DB方法的优势。因此,我决定写我自己的RichSinkFunction
。
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.sink.{RichSinkFunction, SinkFunction}
import com.datastax.driver.core.{Session, Cluster}
class CassandraAsSink extends RichSinkFunction {
override def open(parameters: Configuration): Unit = {
val cluster = Cluster.builder().addContactPoint("localhost").build()//
val session = cluster.connect("example")
}
override def invoke(value: Nothing, context: SinkFunction.Context): Unit = {
session.execute(
s"""
INSERT INTO users (name, credits, user_id)
VALUES ($name, $credits, $userId)
"""
)
}
override def close(): Unit = {
//something like session.close()
}
}
然而,我无法完全开发它。我想在一个单独的类下调用这个方法,该类应该传递我想在代码中输入的3个参数。该记录采用JSON
格式。我可以通过解析和获取属性来管理它。但是我如何将它传递给invoke
方法,以及如何在整个类中传递session
对象。另外,由于我是Flink和Scala的新手,这是一种正确的方式吗?
stream/string.new CassandraAsSink().invoke(name,credits,user_id
(在调用部分工作吗?
Modified:
class CassandraSink extends RichSinkFunction[String] {
var cluster: Cluster = _
var session: Session = _
println("inside....")
override def open(parameters: Configuration): Unit = {
cluster = Cluster.builder().addContactPoint("localhost").build() //
session = cluster.connect("example")
println("Connected....")
}
override def invoke(value: String): Unit = {
println("inside invoke: " + value)
session.execute(
s"""
INSERT INTO jsondata1(records_b)
VALUES ($value)
"""
)
}
override def close(): Unit = {
session.close()
println("Session Closed...")
//something like session.close()
}
}
Calling part
:
val datastreamFromString:DataStream[String]=env.fromElements(data) // where data is string
datastreamFromString.addSink(new CassandraAsSink())
我发现从String
创建的DataStream
有一些问题。这个班工作得很好。我已经将env
变量初始化为类中的第二行。
Flink已经有一个Cassandra接收器;它有一些您从未尝试过支持的宝贵功能,尤其是检查点。
关于您的问题:
您可以将session
设置为成员变量,该变量可以在open
中初始化并在invoke
中使用。
Flink将为进入接收器的每个流记录调用invoke
方法。此记录作为value
参数传递给调用。您需要从该值中提取name等字段。
你需要把水槽连接到你的工作图上;总的来说,它最终会变成这样:
val env = StreamExecutionEnvironment.getExecutionEnvironment
env
.addSource(source)
... // some processing
.addSink(new CassandraAsSink())
env.execute()
顺便说一句,Flink文档中包含了一些培训课程,其中包括示例和练习,可以帮助您入门。