在使用 kafka-cassandra 接收器连接器存储在 cassandra 中之前,是否有任何功能可以在我们的消息中



我正在设置我的日志服务器。我使用 Fluentd 将日志转发到 Kafka,然后将它们存储在 Cassandra 中以供以后使用。为此,我使用的是卡夫卡-卡桑德拉水槽连接器。我必须按时间顺序存储数据,为此我需要在 cassandra 中的消息中添加时间戳。如何做到这一点?

Datamountaineer 连接器使用 kcql,我认为它不支持在日志中插入时间戳。

我的连接器配置如下:

name=cassandra-sink
connector.class=com.datamountaineer.streamreactor.connect.cassandra.sink.CassandraSinkConnector
tasks.max=1
topics=test_AF1
connect.cassandra.kcql=INSERT INTO test_event1 SELECT now() as id, message as msg FROM test_AF1 TIMESTAMP=sys_time()
connect.cassandra.port=9042
connect.cassandra.contact.points=localhost
connect.cassandra.key.space=demo

Kafka Connect的Single Message Transform可以做到这一点。下面是一个示例:

{
"connector.class": "com.datamountaineer.streamreactor.connect.cassandra.sink.CassandraSinkConnector",
"topics": "test_AF1",
…
"transforms": "addTS",
"transforms.addTS.type": "org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.addTS.timestamp.field": "op_ts"
}'

这会向消息负载添加一个名为op_ts的字段,其中包含 Kafka 消息的时间戳。

我不知道它如何与 KCQL 交互;你可能想看看我知道的另外两个 Cassandra 接收器:

  • https://www.confluent.io/hub/confluentinc/kafka-connect-cassandra
  • https://www.confluent.io/hub/datastax/kafka-connect-dse

相关内容

  • 没有找到相关文章

最新更新