Strimzi KafkaConnect & Connector 错误,无法加载



我不确定还能去哪里,因为我看到的所有例子都是我已经复制了很多,但仍然无法使其工作。连接器将不会安装,并且状态为空密码。我每走一步都很吃力,无法使它发挥作用。以下是我所采取的步骤。

容器

FROM strimzi/kafka:0.16.1-kafka-2.4.0
USER root:root
RUN mkdir -p /opt/kafka/plugins/debezium
COPY ./debezium-connector-mysql/ /opt/kafka/plugins/debezium/
USER 1001

接下来,我将创建用于mySQL的秘密。

cat <<EOF | kubectl apply -n kafka-cloud -f - 
apiVersion: v1
kind: Secret
metadata:
name: mysql-auth
type: Opaque
stringData:
mysql-auth.properties: |-
username: root
password: supersecret
EOF

验证

% kubectl -n kafka-cloud get secrets | grep mysql-auth
mysql-auth                                     Opaque                                1      14m

仔细检查以确保用户和密码不为空,因为连接器状态中存在错误。

% kubectl -n kafka-cloud get secret mysql-auth -o yaml
apiVersion: v1
data:
mysql-auth.properties: dXNlcm5hbWU6IHJvb3QKcGFzc3dvcmQ6IHN1cGVyc2VjcmV0
kind: Secret
metadata:
annotations:
kubectl.kubernetes.io/last-applied-configuration: |
{"apiVersion":"v1","kind":"Secret","metadata":{"annotations":{},"name":"mysql-auth","namespace":"kafka-cloud"},"stringData":{"mysql-auth.properties":"username: rootnpassword: supersecret"},"type":"Opaque"}
creationTimestamp: "2022-03-02T23:48:55Z"
name: mysql-auth
namespace: kafka-cloud
resourceVersion: "4041"
uid: 14a7a878-d01f-4899-8dc7-81b515278f32
type: Opaque

添加连接群集

cat <<EOF | kubectl apply -n kafka-cloud -f -
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect
metadata:
name: my-connect-cluster
annotations:
#  # use-connector-resources configures this KafkaConnect
#  # to use KafkaConnector resources to avoid
#  # needing to call the Connect REST API directly
strimzi.io/use-connector-resources: "true"
spec:
version: 3.1.0
image: connect-debezium
replicas: 1
bootstrapServers: my-kafka-cluster-kafka-bootstrap:9092
config:
group.id: connect-cluster
offset.storage.topic: connect-cluster-offsets
config.storage.topic: connect-cluster-configs
status.storage.topic: connect-cluster-status
config.storage.replication.factor: 1
offset.storage.replication.factor: 1
status.storage.replication.factor: 1
config.providers: file
config.providers.file.class: org.apache.kafka.common.config.provider.FileConfigProvider
externalConfiguration:
volumes:
- name: mysql-auth-config
secret:
secretName: mysql-auth
EOF

添加连接器

cat <<EOF | kubectl apply -n kafka-cloud -f -
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
name: mysql-test-connector
labels:
strimzi.io/cluster: my-connect-cluster
spec:
class: io.debezium.connector.mysql.MySqlConnector
tasksMax: 1
config:
database.hostname: 172.17.0.13
database.port: 3306
database.user:  "${file:/opt/kafka/external-configuration/mysql-auth-config/mysql-auth.properties:username}"
database.password: "${file:/opt/kafka/external-configuration/mysql-auth-config/mysql-auth.properties:password}"
database.server.id: 184054
database.server.name: mysql-pod
database.whitelist: sample
database.history.kafka.bootstrap.servers: my-kafka-cluster-kafka-bootstrap:9092
database.history.kafka.topic: "schema-changes.sample"
key.converter: "org.apache.kafka.connect.storage.StringConverter"
value.converter: "org.apache.kafka.connect.storage.StringConverter"
EOF

错误

不管怎样,我都试图得到这个错误。我不知道我错过了什么。我知道这是一个简单的配置,但我想不通。我被卡住了。

% kubectl -n kafka-cloud describe kafkaconnector mysql-test-connector
Name:         mysql-test-connector
Namespace:    kafka-cloud
Labels:       strimzi.io/cluster=my-connect-cluster
Annotations:  <none>
API Version:  kafka.strimzi.io/v1beta2
Kind:         KafkaConnector
Metadata:
Creation Timestamp:  2022-03-02T23:44:20Z
Generation:          1
Managed Fields:
API Version:  kafka.strimzi.io/v1beta2
Fields Type:  FieldsV1
fieldsV1:
f:metadata:
f:annotations:
.:
f:kubectl.kubernetes.io/last-applied-configuration:
f:labels:
.:
f:strimzi.io/cluster:
f:spec:
.:
f:class:
f:config:
.:
f:database.history.kafka.bootstrap.servers:
f:database.history.kafka.topic:
f:database.hostname:
f:database.password:
f:database.port:
f:database.server.id:
f:database.server.name:
f:database.user:
f:database.whitelist:
f:key.converter:
f:value.converter:
f:tasksMax:
Manager:      kubectl-client-side-apply
Operation:    Update
Time:         2022-03-02T23:44:20Z
API Version:  kafka.strimzi.io/v1beta2
Fields Type:  FieldsV1
fieldsV1:
f:status:
.:
f:conditions:
f:observedGeneration:
f:tasksMax:
f:topics:
Manager:         okhttp
Operation:       Update
Subresource:     status
Time:            2022-03-02T23:44:20Z
Resource Version:  3874
UID:               c70ffe4e-3777-4524-af82-dad3a57ca25e
Spec:
Class:  io.debezium.connector.mysql.MySqlConnector
Config:
database.history.kafka.bootstrap.servers:  my-kafka-cluster-kafka-bootstrap:9092
database.history.kafka.topic:              schema-changes.sample
database.hostname:                         172.17.0.13
database.password:                         
database.port:                             3306
database.server.id:                        184054
database.server.name:                      mysql-pod
database.user:                             
database.whitelist:                        sample
key.converter:                             org.apache.kafka.connect.storage.StringConverter
value.converter:                           org.apache.kafka.connect.storage.StringConverter
Tasks Max:                                   1
Status:
Conditions:
Last Transition Time:  2022-03-02T23:45:00.097311Z
Message:               PUT /connectors/mysql-test-connector/config returned 400 (Bad Request): Connector configuration is invalid and contains the following 1 error(s):
A value is required
You can also find the above list of errors at the endpoint `/{connectorType}/config/validate`
Reason:             ConnectRestException
Status:             True
Type:               NotReady
Observed Generation:  1
Tasks Max:            1
Topics:
Events:  <none>

mySQL连接器所需的配置参数为:

database.allowPublicKeyRetrieval: true

这解决了问题。

相关内容

  • 没有找到相关文章

最新更新