我有一条消息,它可以与我的Redis接收器连接器一起使用(连接器为Redis赋值(:
{
"schema": {
"type": "struct",
"fields": [
{
"type": "int64",
"optional": false,
"field": "registertime"
},
{
"type": "string",
"optional": false,
"field": "userid"
},
{
"type": "string",
"optional": false,
"field": "regionid"
},
{
"type": "string",
"optional": false,
"field": "after"
}
],
"optional": false,
"name": "ksql.users"
},
"payload": {
"registertime": 1493819497170,
"userid": "User_1",
"regionid": "Region_5",
"after": "MALE"
}
}
但我想拥有";在";作为嵌套对象:
"after": {
"one": null,
"two": "one"
}
并在此基础上处理数据(即,如果"一"为空,则跳过(。
所以我有一个连接器:
{
"name": "connector1",
"config": {
"topics": "topic1",
"connector.class": "com.github.jcustenborder.kafka.connect.redis.RedisSinkConnector",
"tasks.max": "1",
"connect.redis.error.policy": "NOOP",
"connect.redis.host": "localhost",
"connect.redis.port": "6379",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"connect.redis.kcql": "INSERT INTO prod- SELECT * FROM topic1 PK after;"
}
}
我想从messsage中的嵌套json中提取数据。我的命令是发送信息:
topc=topic1
message=message.json
echo "key:$(jq -rc . $message)" | $kafka_dir/bin/kafka-console-producer.sh --topic $topic --bootstrap-server localhost:9092 --property "parse.key=true" --property "key.separator=:"
我如何发送嵌套的json对象,如何通过Transforms从中只提取一个字段,以及是否基于它的值过程?
发送嵌套数据的工作原理与常规消息相同。
您应该能够更新消息以包含类似于后字段的模式信息
"type": "struct",
"fields": [{"field": "one", "optional": false, "type":"string"},... ],
"optional": false,
"field": "after"
然后相应地更新有效载荷。
我个人从未使用过JSONConverter模式/负载类型,因为Avro更适合这种场景
据我所知,Kafka Connect不能跳过消息;它会处理所有这些。也没有用于提取深度超过1的任意嵌套值的内置转换,因此在after
结构中获取字段可能是个问题。但是,您可以通过修改这个特定连接器的SELECT * FROM topic1
CQL语句来获得它
一般来说,如果您需要这样的逻辑,您可以使用流处理器,如KSQL或Kafka Streams,在转储到数据库之前过滤/修改主题