仅更新Cassandra Sink中不为空的字段



我正在尝试接收来自Kafka的消息,以使用Flink 更新Cassandra数据库

消息就像

case class Message(userId: String, info: Info)
case class Info(property1: Option[Int], property2: Option[Int])

我使用json4s来解析Kafka消息,并将其提取到DataStream[Message]

val kafkaSource: KafkaSource[String] = KafkaSource.builder()
.setBootstrapServers("localhost:29092")
.setTopics("my-topic")
.setStartingOffsets(OffsetsInitializer.earliest())
.setValueOnlyDeserializer(new SimpleStringSchema())
.build()

现在,我只想更新不是None的字段,比如

{
"user_id": "abc-123",
"info": 
{
"property1": 1
}
}

将生成一个案例类,如:

Message(abc-123, Info(Some(1), None))

如何使CassandraSink能够仅更新用户abc-123的此属性?

我试着用一些类似的东西:

CassandraSink
.addSink(dataStream)
.setClusterBuilder(new ClusterBuilder() {
override def buildCluster(builder: Cluster.Builder): Cluster = {
builder
.addContactPoint("127.0.0.1")
.withPort(29042)
.withCredentials("cassandra", "cassandra")
.build()
}
})
.setQuery("UPDATE user_info SET property1 = ?, property2 = ? WHERE id = ?")
.build()

并试图在CassandraSink构建器之外操作查询,但这是不可能的。

有没有办法只更新不是None的字段?

您可以实现RichMapFunction,并在地图界面中更新非None的字段

public class MyMapFunction extends RichMapFunction<T> {
@Override
public void open(Configuration parameters) throws Exception {
}
@Override
public T map(...) throws Exception {
return xxx;
}
}

相关内容

  • 没有找到相关文章

最新更新