我正试图使用lenses.io reactor来消耗从MQTT到Kafka的消息。流式反应器的最新版本
Kafka/Confluent版本
sh-4.4$ kafka-topics --version
7.1.0-ccs (Commit:c86722379ab997cc)
kafka-connect-mqtt-3.0.1-2.5.0-all.jar
预期行为:avro主题本应打印在控制台上
org.apache.kafka.commun.errors.SerializationException:未知魔术字节!
的其他详细信息
kafka-connect:
image: kafka-connect
build:
context: .
hostname: kafka-connect
container_name: kafka-connect
depends_on:
- zookeeper-1
- kafka-broker-1
ports:
- 8083:8083
environment:
CONNECT_BOOTSTRAP_SERVERS: SSL://kafka-broker-1:19093
CONNECT_GROUP_ID: 'kafka-connect'
CONNECT_REST_ADVERTISED_HOST_NAME: 'kafka-connect'
CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
CONNECT_CONFIG_STORAGE_TOPIC: 'connect-config-storage'
CONNECT_OFFSET_STORAGE_TOPIC: 'connect-offset-storage'
CONNECT_STATUS_STORAGE_TOPIC: 'connect-status-storage'
CONNECT_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
CONNECT_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
CONNECT_KEY_CONVERTER_SCHEMAS_ENABLE: "false"
CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE: "false"
CONNECT_PLUGIN_PATH: /etc/kafka/secrets/plugins
CONNECT_SECURITY_PROTOCOL: 'SSL'
CONNECT_SSL_KEY_PASSWORD: confluent
CONNECT_SSL_KEYSTORE_LOCATION: '/etc/kafka/secrets/kafka.consumer.keystore.jks'
CONNECT_SSL_KEYSTORE_PASSWORD: confluent
CONNECT_SSL_TRUSTSTORE_LOCATION: '/etc/kafka/secrets/kafka.consumer.truststore.jks'
CONNECT_SSL_TRUSTSTORE_PASSWORD: confluent
CONNECT_KAFKASTORE_SECURITY_PROTOCOL: 'SSL'
CONNECT_KAFKASTORE_SSL_KEY_PASSWORD: confluent
CONNECT_KAFKASTORE_SSL_KEYSTORE_LOCATION: '/etc/kafka/secrets/kafka.consumer.keystore.jks'
CONNECT_KAFKASTORE_SSL_KEYSTORE_PASSWORD: confluent
CONNECT_KAFKASTORE_SSL_TRUSTSTORE_LOCATION: '/etc/kafka/secrets/kafka.consumer.truststore.jks'
CONNECT_KAFKASTORE_SSL_TRUSTSTORE_PASSWORD: confluent
CONNECT_PRODUCER_SECURITY_PROTOCOL: 'SSL'
CONNECT_PRODUCER_SSL_KEY_PASSWORD: confluent
CONNECT_PRODUCER_SSL_KEYSTORE_LOCATION: '/etc/kafka/secrets/kafka.consumer.keystore.jks'
CONNECT_PRODUCER_SSL_KEYSTORE_PASSWORD: confluent
CONNECT_PRODUCER_SSL_TRUSTSTORE_LOCATION: '/etc/kafka/secrets/kafka.consumer.truststore.jks'
CONNECT_PRODUCER_SSL_TRUSTSTORE_PASSWORD: confluent
CONNECT_CONSUMER_SECURITY_PROTOCOL: 'SSL'
CONNECT_CONSUMER_SSL_KEY_PASSWORD: confluent
CONNECT_CONSUMER_SSL_KEYSTORE_LOCATION: '/etc/kafka/secrets/kafka.consumer.keystore.jks'
CONNECT_CONSUMER_SSL_KEYSTORE_PASSWORD: confluent
CONNECT_CONSUMER_SSL_TRUSTSTORE_LOCATION: '/etc/kafka/secrets/kafka.consumer.truststore.jks'
CONNECT_CONSUMER_SSL_TRUSTSTORE_PASSWORD: confluent
volumes:
- ${KAFKA_SSL_SECRETS_DIR}/connects:/etc/kafka/secrets
networks:
- kafka-cluster-network
连接器属性配置(我的连接器.properties(
curl -X PUT
-H "Content-Type: application/json"
--data '{
"name": "mqtt-source",
"connector.class": "com.datamountaineer.streamreactor.connect.mqtt.source.MqttSourceConnector",
"tasks.max": "1",
"topics": "mqtt",
"connect.mqtt.connection.clean": "true",
"connect.mqtt.connection.timeout": "1000",
"connect.mqtt.kcql": "INSERT INTO mqtt SELECT * FROM /ais",
"connect.mqtt.connection.keep.alive": "1000",
"connect.mqtt.source.converters": "/ais=com.datamountaineer.streamreactor.connect.converters.source.AvroConverter",
"connect.source.converter.avro.schemas": "/ais=/etc/kafka/secrets/plugins/classAPositionReportSchema.json",
"connect.mqtt.client.id": "dm_source_id",
"connect.mqtt.converter.throw.on.error": "true",
"connect.mqtt.hosts": "tcp://mqtt:1883",
"connect.mqtt.service.quality": "1"
}' http://localhost:8083/connectors/mqtt-source/config | jq .
avro json文件
{
"type": "record",
"name": "aisClassAPositionReport",
"namespace": "com.landoop.ais",
"doc": "Schema for AIS Class A Position Reports.",
"fields": [
{
"name": "Type",
"type": "int",
"doc": "The type of the AIS Message. 1/2/3 are Class A position reports."
},
{
"name": "Repeat",
"type":"int",
"doc": "Repeat Indicator"
},
{
"name": "MMSI",
"type": "long",
"doc": "User ID (MMSI)"
},
{
"name": "Speed",
"type": "float",
"doc": "Speed over Ground (SOG)"
},
{
"name": "Accuracy",
"type": "boolean",
"doc": "Position Accuracy"
},
{
"name": "Longitude",
"type": "double",
"doc": "Longitude"
},
{
"name": "Latitude",
"type": "double",
"doc": "Latitude"
},
{
"name": "Course",
"type": "float",
"doc": "Course over Ground (COG)"
},
{
"name": "Heading",
"type": "int",
"doc": "True Heading (HDG)"
},
{
"name": "Second",
"type": "int",
"doc": "Time Stamp"
},
{
"name": "RAIM",
"type": "boolean",
"doc": "RAIM flag"
},
{
"name": "Radio",
"type": "long",
"doc": "Radio Status"
},
{
"name": "Status",
"type": "int",
"doc": "Navigation Status (enumerated type)"
},
{
"name": "Turn",
"type": "float",
"doc": "Rate of Turn (ROT)"
},
{
"name": "Maneuver",
"type": "int",
"doc": "Manuever Indicator (enumerated type)"
},
{
"name": "Timestamp",
"type": "long",
"doc": "Time the message was encoded to avro (nanoseconds since epoch). May be used for ordering."
}
]
}
MQTT代理消息
mosquitto_pub
-m "{"Type": 384558914, "Repeat": 1429873353, "MMSI": 1421443607430111832, "Speed": 0.32155126, "Accuracy": true, "Longitude": 0.3627212439937161, "Latitude": 0.2725890739370421, "Course": 0.99500954, "Heading": -2064209033, "Second": -1096102271, "RAIM": true, "Radio": -189624595456590919, "Status": -139830130, "Turn": 0.035991907, "Maneuver": 1595359693, "Timestamp": -932628952948741103}"
-d -r -t /ais
全对数
sh-4.4$ kafka-avro-console-consumer --bootstrap-server kafka-broker-1:19093 --topic mqtt --from-beginning --max-messages 10 --consumer.config /etc/kafka/secrets/host.consumer.ssl.config --property schema.registry.url=http://0.0.0.0:8081
[2022-04-13 05:49:22,333] INFO Registered kafka:type=kafka.Log4jController MBean (kafka.utils.Log4jControllerRegistration$)
[2022-04-13 05:49:23,082] INFO ConsumerConfig values:
allow.auto.create.topics = true
auto.commit.interval.ms = 5000
auto.offset.reset = earliest
bootstrap.servers = [kafka-broker-1:19093]
check.crcs = true
client.dns.lookup = use_all_dns_ips
client.id = console-consumer
client.rack =
connections.max.idle.ms = 540000
default.api.timeout.ms = 60000
enable.auto.commit = false
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id = console-consumer-27706
group.instance.id = null
heartbeat.interval.ms = 3000
interceptor.classes = []
internal.leave.group.on.close = true
internal.throw.on.fetch.stable.offset.unsupported = false
isolation.level = read_uncommitted
key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
max.partition.fetch.bytes = 1048576
max.poll.interval.ms = 300000
max.poll.records = 500
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor, class org.apache.kafka.clients.consumer.CooperativeStickyAssignor]
receive.buffer.bytes = 65536
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.connect.timeout.ms = null
sasl.login.read.timeout.ms = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.login.retry.backoff.max.ms = 10000
sasl.login.retry.backoff.ms = 100
sasl.mechanism = GSSAPI
sasl.oauthbearer.clock.skew.seconds = 30
sasl.oauthbearer.expected.audience = null
sasl.oauthbearer.expected.issuer = null
sasl.oauthbearer.jwks.endpoint.refresh.ms = 3600000
sasl.oauthbearer.jwks.endpoint.retry.backoff.max.ms = 10000
sasl.oauthbearer.jwks.endpoint.retry.backoff.ms = 100
sasl.oauthbearer.jwks.endpoint.url = null
sasl.oauthbearer.scope.claim.name = scope
sasl.oauthbearer.sub.claim.name = sub
sasl.oauthbearer.token.endpoint.url = null
security.protocol = SSL
security.providers = null
send.buffer.bytes = 131072
session.timeout.ms = 45000
socket.connection.setup.timeout.max.ms = 30000
socket.connection.setup.timeout.ms = 10000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
ssl.endpoint.identification.algorithm =
ssl.engine.factory.class = null
ssl.key.password = [hidden]
ssl.keymanager.algorithm = SunX509
ssl.keystore.certificate.chain = null
ssl.keystore.key = null
ssl.keystore.location = /etc/kafka/secrets/kafka.consumer.keystore.jks
ssl.keystore.password = [hidden]
ssl.keystore.type = JKS
ssl.protocol = TLSv1.3
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.certificates = null
ssl.truststore.location = /etc/kafka/secrets/kafka.consumer.truststore.jks
ssl.truststore.password = [hidden]
ssl.truststore.type = JKS
value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
(org.apache.kafka.clients.consumer.ConsumerConfig)
[2022-04-13 05:49:23,269] INFO Kafka version: 7.1.0-ce (org.apache.kafka.common.utils.AppInfoParser)
[2022-04-13 05:49:23,269] INFO Kafka commitId: 5c05312ab63acecf (org.apache.kafka.common.utils.AppInfoParser)
[2022-04-13 05:49:23,269] INFO Kafka startTimeMs: 1649828963261 (org.apache.kafka.common.utils.AppInfoParser)
[2022-04-13 05:49:23,274] INFO [Consumer clientId=console-consumer, groupId=console-consumer-27706] Subscribed to topic(s): mqtt (org.apache.kafka.clients.consumer.KafkaConsumer)
[2022-04-13 05:49:24,085] INFO [Consumer clientId=console-consumer, groupId=console-consumer-27706] Resetting the last seen epoch of partition mqtt-0 to 0 since the associated topicId changed from null to eLc9qW-WTemQ53DDH9JgzA (org.apache.kafka.clients.Metadata)
[2022-04-13 05:49:24,093] INFO [Consumer clientId=console-consumer, groupId=console-consumer-27706] Cluster ID: 7mB45_SgTXSxROWQruwrRQ (org.apache.kafka.clients.Metadata)
[2022-04-13 05:49:24,095] INFO [Consumer clientId=console-consumer, groupId=console-consumer-27706] Discovered group coordinator kafka-broker-1:19093 (id: 2147483646 rack: null) (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
[2022-04-13 05:49:24,099] INFO [Consumer clientId=console-consumer, groupId=console-consumer-27706] (Re-)joining group (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
[2022-04-13 05:49:24,167] INFO [Consumer clientId=console-consumer, groupId=console-consumer-27706] Request joining group due to: need to re-join with the given member-id (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
[2022-04-13 05:49:24,168] INFO [Consumer clientId=console-consumer, groupId=console-consumer-27706] (Re-)joining group (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
[2022-04-13 05:49:27,174] INFO [Consumer clientId=console-consumer, groupId=console-consumer-27706] Successfully joined group with generation Generation{generationId=1, memberId='console-consumer-35015e12-2725-473a-b7b1-70cce478ed76', protocol='range'} (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
[2022-04-13 05:49:27,179] INFO [Consumer clientId=console-consumer, groupId=console-consumer-27706] Finished assignment for group at generation 1: {console-consumer-35015e12-2725-473a-b7b1-70cce478ed76=Assignment(partitions=[mqtt-0])} (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
[2022-04-13 05:49:27,202] INFO [Consumer clientId=console-consumer, groupId=console-consumer-27706] Successfully synced group in generation Generation{generationId=1, memberId='console-consumer-35015e12-2725-473a-b7b1-70cce478ed76', protocol='range'} (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
[2022-04-13 05:49:27,203] INFO [Consumer clientId=console-consumer, groupId=console-consumer-27706] Notifying assignor about the new Assignment(partitions=[mqtt-0]) (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
[2022-04-13 05:49:27,210] INFO [Consumer clientId=console-consumer, groupId=console-consumer-27706] Adding newly assigned partitions: mqtt-0 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
[2022-04-13 05:49:27,234] INFO [Consumer clientId=console-consumer, groupId=console-consumer-27706] Found no committed offset for partition mqtt-0 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
[2022-04-13 05:49:27,370] INFO [Consumer clientId=console-consumer, groupId=console-consumer-27706] Seeking to offset 1 for partition mqtt-0 (org.apache.kafka.clients.consumer.KafkaConsumer)
[2022-04-13 05:49:27,371] INFO [Consumer clientId=console-consumer, groupId=console-consumer-27706] Revoke previously assigned partitions mqtt-0 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
[2022-04-13 05:49:27,371] INFO [Consumer clientId=console-consumer, groupId=console-consumer-27706] Member console-consumer-35015e12-2725-473a-b7b1-70cce478ed76 sending LeaveGroup request to coordinator kafka-broker-1:19093 (id: 2147483646 rack: null) due to the consumer is being closed (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
[2022-04-13 05:49:27,374] INFO [Consumer clientId=console-consumer, groupId=console-consumer-27706] Resetting generation due to: consumer pro-actively leaving the group (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
[2022-04-13 05:49:27,375] INFO [Consumer clientId=console-consumer, groupId=console-consumer-27706] Request joining group due to: consumer pro-actively leaving the group (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
[2022-04-13 05:49:27,383] INFO Metrics scheduler closed (org.apache.kafka.common.metrics.Metrics)
[2022-04-13 05:49:27,383] INFO Closing reporter org.apache.kafka.common.metrics.JmxReporter (org.apache.kafka.common.metrics.Metrics)
[2022-04-13 05:49:27,383] INFO Metrics reporters closed (org.apache.kafka.common.metrics.Metrics)
[2022-04-13 05:49:27,396] INFO App info kafka.consumer for console-consumer unregistered (org.apache.kafka.common.utils.AppInfoParser)
Processed a total of 1 messages
[2022-04-13 05:49:27,400] ERROR Unknown error when running consumer: (kafka.tools.ConsoleConsumer$)
org.apache.kafka.common.errors.SerializationException: Unknown magic byte!
at io.confluent.kafka.serializers.AbstractKafkaSchemaSerDe.getByteBuffer(AbstractKafkaSchemaSerDe.java:250)
at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer$DeserializationContext.<init>(AbstractKafkaAvroDeserializer.java:322)
at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:112)
at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:87)
at io.confluent.kafka.formatter.AvroMessageFormatter$AvroMessageDeserializer.deserialize(AvroMessageFormatter.java:133)
at io.confluent.kafka.formatter.AvroMessageFormatter.writeTo(AvroMessageFormatter.java:92)
at io.confluent.kafka.formatter.SchemaMessageFormatter.writeTo(SchemaMessageFormatter.java:181)
at kafka.tools.ConsoleConsumer$.process(ConsoleConsumer.scala:115)
at kafka.tools.ConsoleConsumer$.run(ConsoleConsumer.scala:75)
at kafka.tools.ConsoleConsumer$.main(ConsoleConsumer.scala:52)
at kafka.tools.ConsoleConsumer.main(ConsoleConsumer.scala)
更新:
还在架构注册表上注册了架构
curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" --data '{ "schema": "{"type": "record", "name": "aisClassAPositionReport", "namespace": "com.landoop.ais", "doc": "Schema for AIS Class A Position Reports.", "fields": [{"name": "Type","type": "int","doc": "The type of the AIS Message. 1/2/3 are Class A position reports."},{"name": "Repeat","type":"int","doc": "Repeat Indicator"},{"name": "MMSI","type": "long","doc": "User ID (MMSI)"},{"name": "Speed","type": "float","doc": "Speed over Ground (SOG)"},{"name": "Accuracy","type": "boolean","doc": "Position Accuracy"},{"name": "Longitude","type": "double","doc": "Longitude"},{"name": "Latitude","type": "double","doc": "Latitude"},{"name": "Course","type": "float","doc": "Course over Ground (COG)"},{"name": "Heading","type": "int","doc": "True Heading (HDG)"},{"name": "Second","type": "int","doc": "Time Stamp"},{"name": "RAIM","type": "boolean","doc": "RAIM flag"},{"name": "Radio","type": "long","doc": "Radio Status"},{"name": "Status","type": "int","doc": "Navigation Status (enumerated type)"},{"name": "Turn","type": "float","doc": "Rate of Turn (ROT)"},{"name": "Maneuver","type": "int","doc": "Manuever Indicator (enumerated type)"},{"name": "Timestamp","type": "long","doc": "Time the message was encoded to avro (nanoseconds since epoch). May be used for ordering."}]}"}' http://0.0.0.0:8081/subjects/mqtt-value/versions
我可能做错了什么?
根据CONNECT_VALUE_CONVERTER
,您正在生成JSON,而不是Avro,因此您不能使用kafka-avro-console-consumer
读取JSON数据,并且您的Avro模式也没有被使用
我不确定connect.mqtt.source.converters
是如何工作的,但您还没有将其配置为使用任何注册表,这是kafka-avro-console-consumer
工作的要求