我正在尝试接收来自Kafka的消息,以使用Flink 更新Cassandra数据库
消息就像
case class Message(userId: String, info: Info)
case class Info(property1: Option[Int], property2: Option[Int])
我使用json4s
来解析Kafka消息,并将其提取到DataStream[Message]
中
val kafkaSource: KafkaSource[String] = KafkaSource.builder()
.setBootstrapServers("localhost:29092")
.setTopics("my-topic")
.setStartingOffsets(OffsetsInitializer.earliest())
.setValueOnlyDeserializer(new SimpleStringSchema())
.build()
现在,我只想更新不是None
的字段,比如
{
"user_id": "abc-123",
"info":
{
"property1": 1
}
}
将生成一个案例类,如:
Message(abc-123, Info(Some(1), None))
如何使CassandraSink
能够仅更新用户abc-123
的此属性?
我试着用一些类似的东西:
CassandraSink
.addSink(dataStream)
.setClusterBuilder(new ClusterBuilder() {
override def buildCluster(builder: Cluster.Builder): Cluster = {
builder
.addContactPoint("127.0.0.1")
.withPort(29042)
.withCredentials("cassandra", "cassandra")
.build()
}
})
.setQuery("UPDATE user_info SET property1 = ?, property2 = ? WHERE id = ?")
.build()
并试图在CassandraSink
构建器之外操作查询,但这是不可能的。
有没有办法只更新不是None
的字段?
您可以实现RichMapFunction,并在地图界面中更新非None的字段
public class MyMapFunction extends RichMapFunction<T> {
@Override
public void open(Configuration parameters) throws Exception {
}
@Override
public T map(...) throws Exception {
return xxx;
}
}