我正在设置我的日志服务器。我使用 Fluentd 将日志转发到 Kafka,然后将它们存储在 Cassandra 中以供以后使用。为此,我使用的是卡夫卡-卡桑德拉水槽连接器。我必须按时间顺序存储数据,为此我需要在 cassandra 中的消息中添加时间戳。如何做到这一点?
Datamountaineer 连接器使用 kcql,我认为它不支持在日志中插入时间戳。
我的连接器配置如下:
name=cassandra-sink
connector.class=com.datamountaineer.streamreactor.connect.cassandra.sink.CassandraSinkConnector
tasks.max=1
topics=test_AF1
connect.cassandra.kcql=INSERT INTO test_event1 SELECT now() as id, message as msg FROM test_AF1 TIMESTAMP=sys_time()
connect.cassandra.port=9042
connect.cassandra.contact.points=localhost
connect.cassandra.key.space=demo
Kafka Connect的Single Message Transform可以做到这一点。下面是一个示例:
{
"connector.class": "com.datamountaineer.streamreactor.connect.cassandra.sink.CassandraSinkConnector",
"topics": "test_AF1",
…
"transforms": "addTS",
"transforms.addTS.type": "org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.addTS.timestamp.field": "op_ts"
}'
这会向消息负载添加一个名为op_ts
的字段,其中包含 Kafka 消息的时间戳。
我不知道它如何与 KCQL 交互;你可能想看看我知道的另外两个 Cassandra 接收器:
- https://www.confluent.io/hub/confluentinc/kafka-connect-cassandra
- https://www.confluent.io/hub/datastax/kafka-connect-dse