我正在使用Confluent Influxdb Connector将数据从Kafka发送到Influxdb。配置如下所示:
connector.class=io.confluent.influxdb.InfluxDBSinkConnector
influxdb.url=myurl
topics=mytopic
tasks.max=1
架构如下所示:
{
"type": "record",
"name": "myrecord",
"fields": [
{
"name": "sn",
"type": "string"
},
{
"name": "value",
"type": "float"
},
{
"name": "tagnum",
"type": "string"
}
]
}
将数据从 Kafka 发送到 Influxdb 时,每个数据项都被视为 FIELD。
当使用InfluxDB连接器从Kafka发送到InfluxDB时,如何将某些数据项设置为TAG,例如将"tagnum"设置为TAG?
您的架构如下所示。重要的是您的标签位于map
字段中。
{
"type": "struct",
"fields": [
{ "type": "map", "keys": { "type": "string", "optional": false }, "values": { "type": "string", "optional": false }, "optional": false, "field": "tags" },
{ "field": "sn", "optional": false, "type": "string" },
{ "field": "value", "optional": false, "type": "float" }
],
"optional": false,
"version": 1
}
下面是发送 JSON 有效负载的示例:
kafkacat -b localhost:9092 -P -t testdata-json4 <<EOF
{ "schema": { "type": "struct", "fields": [ { "type": "map", "keys": { "type": "string", "optional": false }, "values": { "type": "string", "optional": false }, "optional": false, "field": "tags" }, { "field": "sn", "optional": false, "type": "string" }, { "field": "value", "optional": false, "type": "float" } ], "optional": false, "version": 1 }, "payload": { "tags": { "tagnum": "5" }, "sn": "FOO", "value": 500.0 } }
EOF
curl -i -X PUT -H "Accept:application/json"
-H "Content-Type:application/json" http://localhost:8083/connectors/SINK_INFLUX_01/config
-d '{
"connector.class" : "io.confluent.influxdb.InfluxDBSinkConnector",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "true",
"key.converter" : "org.apache.kafka.connect.storage.StringConverter",
"topics" : "testdata-json4",
"influxdb.url" : "http://influxdb:8086",
"influxdb.db" : "my_db",
"measurement.name.format" : "${topic}"
}'
InfluxDB 的结果:
$ influx -execute 'SELECT * FROM "testdata-json4" GROUP BY tagnum;' -database "my_db"
name: testdata-json4
tags: tagnum=5
time sn value
---- -- -----
1579713749737000000 FOO 500
参考: https://rmoff.net/2020/01/23/notes-on-getting-data-into-influxdb-from-kafka-with-kafka-connect/