RichSinkFunction for Cassandra in Flink



我了解到使用RichSinkFunction相对于直接调用DB方法的优势。因此,我决定写我自己的RichSinkFunction

import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.sink.{RichSinkFunction, SinkFunction}
import com.datastax.driver.core.{Session, Cluster}
class CassandraAsSink extends RichSinkFunction {
override def open(parameters: Configuration): Unit = {
val cluster = Cluster.builder().addContactPoint("localhost").build()//
val session = cluster.connect("example")
}
override def invoke(value: Nothing, context: SinkFunction.Context): Unit = {
session.execute(
s"""
INSERT INTO users (name, credits, user_id)
VALUES ($name, $credits, $userId)
"""
)
}
override def close(): Unit = {
//something like session.close()
}
}  

然而,我无法完全开发它。我想在一个单独的类下调用这个方法,该类应该传递我想在代码中输入的3个参数。该记录采用JSON格式。我可以通过解析和获取属性来管理它。但是我如何将它传递给invoke方法,以及如何在整个类中传递session对象。另外,由于我是Flink和Scala的新手,这是一种正确的方式吗?

stream/string.new CassandraAsSink().invoke(name,credits,user_id(在调用部分工作吗?

Modified:

class CassandraSink extends RichSinkFunction[String] {
var cluster: Cluster = _
var session: Session = _
println("inside....")
override def open(parameters: Configuration): Unit = {
cluster = Cluster.builder().addContactPoint("localhost").build() //
session = cluster.connect("example")
println("Connected....")
}
override def invoke(value: String): Unit = {
println("inside invoke: " + value)
session.execute(
s"""
INSERT INTO jsondata1(records_b)
VALUES ($value)
"""
)
}
override def close(): Unit = {
session.close()
println("Session Closed...")
//something like session.close()
}
}

Calling part:

val datastreamFromString:DataStream[String]=env.fromElements(data) // where data is string
datastreamFromString.addSink(new CassandraAsSink())

我发现从String创建的DataStream有一些问题。这个班工作得很好。我已经将env变量初始化为类中的第二行。

Flink已经有一个Cassandra接收器;它有一些您从未尝试过支持的宝贵功能,尤其是检查点。

关于您的问题:

您可以将session设置为成员变量,该变量可以在open中初始化并在invoke中使用。

Flink将为进入接收器的每个流记录调用invoke方法。此记录作为value参数传递给调用。您需要从该值中提取name等字段。

你需要把水槽连接到你的工作图上;总的来说,它最终会变成这样:

val env = StreamExecutionEnvironment.getExecutionEnvironment
env
.addSource(source)
... // some processing
.addSink(new CassandraAsSink())
env.execute()

顺便说一句,Flink文档中包含了一些培训课程,其中包括示例和练习,可以帮助您入门。

相关内容

  • 没有找到相关文章

最新更新