通过Kafka连接Ditto到InfluxDB



我在docker上运行kafka和influxDB。我在ditto上创建了一个数字双胞胎,当我用mqtt发送消息时,它会正确更新。我希望数据从ditto发送到influxDB,但在influxDB上,一旦我创建了桶,它显示没有任何数据。

我遵循了这个指南:https://www.influxdata.com/blog/getting-started-apache-kafka-influxdb/(我知道这是一个python程序,但步骤应该是一样的,我只是使用kafka消费者的telegraf插件,而不是指南中使用的那个)。我已经创建了telegraf的连接和配置文件,但在InfluxDB上没有任何变化。

这是电报。conf'

[[outputs.influxdb_v2]]
## The URLs of the InfluxDB cluster nodes.
##
## Multiple URLs can be specified for a single cluster, only ONE of the
## urls will be written to each interval.
##   ex: urls = ["https://us-west-2-1.aws.cloud2.influxdata.com"]
urls = ["http://localhost:8086"]
## API token for authentication.
token = "$INFLUX_TOKEN"
## Organization is the name of the organization you wish to write to; must exist.
organization = "digital"
## Destination bucket to write into.
bucket = "arduino"
## The value of this tag will be used to determine the bucket.  If this
## tag is not set the 'bucket' option is used as the default.
# bucket_tag = ""
## If true, the bucket tag will not be added to the metric.
# exclude_bucket_tag = false
## Timeout for HTTP messages.
# timeout = "5s"
## Additional HTTP headers
# http_headers = {"X-Special-Header" = "Special-Value"}
## HTTP Proxy override, if unset values the standard proxy environment
## variables are consulted to determine which proxy, if any, should be used.
# http_proxy = "http://corporate.proxy:3128"
## HTTP User-Agent
# user_agent = "telegraf"
## Content-Encoding for write request body, can be set to "gzip" to
## compress body or "identity" to apply no encoding.
# content_encoding = "gzip"
## Enable or disable uint support for writing uints influxdb 2.0.
# influx_uint_support = false
## Optional TLS Config for use on HTTP connections.
# tls_ca = "/etc/telegraf/ca.pem"
# tls_cert = "/etc/telegraf/cert.pem"
# tls_key = "/etc/telegraf/key.pem"
## Use TLS but skip chain & host verification
# insecure_skip_verify = false
# Read metrics from Kafka topics
[[inputs.kafka_consumer]]
## Kafka brokers.
brokers = ["localhost:9092"]
## Topics to consume.
topics = ["arduino"]
## When set this tag will be added to all metrics with the topic as the value.
# topic_tag = ""
## Optional Client id
# client_id = "Telegraf"
## Set the minimal supported Kafka version.  Setting this enables the use of new
## Kafka features and APIs.  Must be 0.10.2.0 or greater.
##   ex: version = "1.1.0"
# version = ""
## Optional TLS Config
# enable_tls = false
# tls_ca = "/etc/telegraf/ca.pem"
# tls_cert = "/etc/telegraf/cert.pem"
# tls_key = "/etc/telegraf/key.pem"
## Use TLS but skip chain & host verification
# insecure_skip_verify = false
## SASL authentication credentials.  These settings should typically be used
## with TLS encryption enabled
# sasl_username = "kafka"
# sasl_password = "secret"
## Optional SASL:
## one of: OAUTHBEARER, PLAIN, SCRAM-SHA-256, SCRAM-SHA-512, GSSAPI
## (defaults to PLAIN)
# sasl_mechanism = ""
## used if sasl_mechanism is GSSAPI (experimental)
# sasl_gssapi_service_name = ""
# ## One of: KRB5_USER_AUTH and KRB5_KEYTAB_AUTH
# sasl_gssapi_auth_type = "KRB5_USER_AUTH"
# sasl_gssapi_kerberos_config_path = "/"
# sasl_gssapi_realm = "realm"
# sasl_gssapi_key_tab_path = ""
# sasl_gssapi_disable_pafxfast = false
## used if sasl_mechanism is OAUTHBEARER (experimental)
# sasl_access_token = ""
## SASL protocol version.  When connecting to Azure EventHub set to 0.
# sasl_version = 1
# Disable Kafka metadata full fetch
# metadata_full = false
## Name of the consumer group.
# consumer_group = "telegraf_metrics_consumers"
## Compression codec represents the various compression codecs recognized by
## Kafka in messages.
##  0 : None
##  1 : Gzip
##  2 : Snappy
##  3 : LZ4
##  4 : ZSTD
# compression_codec = 0
## Initial offset position; one of "oldest" or "newest".
# offset = "oldest"
## Consumer group partition assignment strategy; one of "range", "roundrobin" or "sticky".
# balance_strategy = "range"
## Maximum length of a message to consume, in bytes (default 0/unlimited);
## larger messages are dropped
max_message_len = 1000000
## Maximum messages to read from the broker that have not been written by an
## output.  For best throughput set based on the number of metrics within
## each message and the size of the output's metric_batch_size.
##
## For example, if each message from the queue contains 10 metrics and the
## output metric_batch_size is 1000, setting this to 100 will ensure that a
## full batch is collected and the write is triggered immediately without
## waiting until the next flush_interval.
# max_undelivered_messages = 1000
## Maximum amount of time the consumer should take to process messages. If
## the debug log prints messages from sarama about 'abandoning subscription
## to [topic] because consuming was taking too long', increase this value to
## longer than the time taken by the output plugin(s).
##
## Note that the effective timeout could be between 'max_processing_time' and
## '2 * max_processing_time'.
# max_processing_time = "100ms"
## The default number of message bytes to fetch from the broker in each
## request (default 1MB). This should be larger than the majority of
## your messages, or else the consumer will spend a lot of time
## negotiating sizes and not actually consuming. Similar to the JVM's
## `fetch.message.max.bytes`.
# consumer_fetch_default = "1MB"
## Data format to consume.
## Each data format has its own unique set of configuration options, read
## more about them here:
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
data_format = "json"

kafka连接,因为它在同上资源管理器:

{
"id": "0ab4b527-617f-4f4f-8bac-4ffa4b5a8471",
"name": "Kafka 2.x",
"connectionType": "kafka",
"connectionStatus": "open",
"uri": "tcp://192.168.109.74:9092",
"sources": [
{
"addresses": [
"arduino"
],
"consumerCount": 1,
"qos": 1,
"authorizationContext": [
"nginx:ditto"
],
"enforcement": {
"input": "{{ header:device_id }}",
"filters": [
"{{ entity:id }}"
]
},
"acknowledgementRequests": {
"includes": []
},
"headerMapping": {},
"payloadMapping": [
"Ditto"
],
"replyTarget": {
"address": "theReplyTopic",
"headerMapping": {},
"expectedResponseTypes": [
"response",
"error",
"nack"
],
"enabled": true
}
}
],
"targets": [
{
"address": "topic/key",
"topics": [
"_/_/things/twin/events",
"_/_/things/live/messages"
],
"authorizationContext": [
"nginx:ditto"
],
"headerMapping": {}
}
],
"clientCount": 1,
"failoverEnabled": true,
"validateCertificates": true,
"processorPoolSize": 1,
"specificConfig": {
"saslMechanism": "plain",
"bootstrapServers": "localhost:9092"
},
"tags": []
}

the policy file for ditto:

{
"policyId": "my.test:policy1",
"entries": {
"owner": {
"subjects": {
"nginx:ditto": {
"type": "nginx basic auth user"
}
},
"resources": {
"thing:/": {
"grant": ["READ","WRITE"],
"revoke": []
},
"policy:/": {
"grant": ["READ","WRITE"],
"revoke": []
},
"message:/": {
"grant": ["READ","WRITE"],
"revoke": []
}
}
},
"observer": {
"subjects": {
"ditto:observer": {
"type": "observer user"
}
},
"resources": {
"thing:/features": {
"grant": ["READ"],
"revoke": []
},
"policy:/": {
"grant": ["READ"],
"revoke": []
},
"message:/": {
"grant": ["READ"],
"revoke": []
}
}
}
}
}

telegraf的配置文件,但在InfluxDB上没有任何变化

当Telegraf从Kafka读取数据时,需要将其转换为InfluxDB可以消化的时间序列指标。您已经正确地选择了JSON解析器,但是可能需要额外的配置,或者甚至需要使用更强大的json_v2解析器,以便能够根据JSON数据设置标记和字段。

我的建议是使用[[outputs.file]]输出来查看是否有任何东西甚至通过,可能什么都不会显示。然后执行以下操作:

  1. 决定你的JSON在kafka
  2. 中的样子
  3. 你想让JSON看起来像时间序列数据在influxdb
  4. 使用json_v2解析器来设置适当的标签和字段。

最新更新