我不确定还能去哪里,因为我看到的所有例子都是我已经复制了很多,但仍然无法使其工作。连接器将不会安装,并且状态为空密码。我每走一步都很吃力,无法使它发挥作用。以下是我所采取的步骤。
容器
FROM strimzi/kafka:0.16.1-kafka-2.4.0
USER root:root
RUN mkdir -p /opt/kafka/plugins/debezium
COPY ./debezium-connector-mysql/ /opt/kafka/plugins/debezium/
USER 1001
接下来,我将创建用于mySQL的秘密。
cat <<EOF | kubectl apply -n kafka-cloud -f -
apiVersion: v1
kind: Secret
metadata:
name: mysql-auth
type: Opaque
stringData:
mysql-auth.properties: |-
username: root
password: supersecret
EOF
验证
% kubectl -n kafka-cloud get secrets | grep mysql-auth
mysql-auth Opaque 1 14m
仔细检查以确保用户和密码不为空,因为连接器状态中存在错误。
% kubectl -n kafka-cloud get secret mysql-auth -o yaml
apiVersion: v1
data:
mysql-auth.properties: dXNlcm5hbWU6IHJvb3QKcGFzc3dvcmQ6IHN1cGVyc2VjcmV0
kind: Secret
metadata:
annotations:
kubectl.kubernetes.io/last-applied-configuration: |
{"apiVersion":"v1","kind":"Secret","metadata":{"annotations":{},"name":"mysql-auth","namespace":"kafka-cloud"},"stringData":{"mysql-auth.properties":"username: rootnpassword: supersecret"},"type":"Opaque"}
creationTimestamp: "2022-03-02T23:48:55Z"
name: mysql-auth
namespace: kafka-cloud
resourceVersion: "4041"
uid: 14a7a878-d01f-4899-8dc7-81b515278f32
type: Opaque
添加连接群集
cat <<EOF | kubectl apply -n kafka-cloud -f -
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect
metadata:
name: my-connect-cluster
annotations:
# # use-connector-resources configures this KafkaConnect
# # to use KafkaConnector resources to avoid
# # needing to call the Connect REST API directly
strimzi.io/use-connector-resources: "true"
spec:
version: 3.1.0
image: connect-debezium
replicas: 1
bootstrapServers: my-kafka-cluster-kafka-bootstrap:9092
config:
group.id: connect-cluster
offset.storage.topic: connect-cluster-offsets
config.storage.topic: connect-cluster-configs
status.storage.topic: connect-cluster-status
config.storage.replication.factor: 1
offset.storage.replication.factor: 1
status.storage.replication.factor: 1
config.providers: file
config.providers.file.class: org.apache.kafka.common.config.provider.FileConfigProvider
externalConfiguration:
volumes:
- name: mysql-auth-config
secret:
secretName: mysql-auth
EOF
添加连接器
cat <<EOF | kubectl apply -n kafka-cloud -f -
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
name: mysql-test-connector
labels:
strimzi.io/cluster: my-connect-cluster
spec:
class: io.debezium.connector.mysql.MySqlConnector
tasksMax: 1
config:
database.hostname: 172.17.0.13
database.port: 3306
database.user: "${file:/opt/kafka/external-configuration/mysql-auth-config/mysql-auth.properties:username}"
database.password: "${file:/opt/kafka/external-configuration/mysql-auth-config/mysql-auth.properties:password}"
database.server.id: 184054
database.server.name: mysql-pod
database.whitelist: sample
database.history.kafka.bootstrap.servers: my-kafka-cluster-kafka-bootstrap:9092
database.history.kafka.topic: "schema-changes.sample"
key.converter: "org.apache.kafka.connect.storage.StringConverter"
value.converter: "org.apache.kafka.connect.storage.StringConverter"
EOF
错误
不管怎样,我都试图得到这个错误。我不知道我错过了什么。我知道这是一个简单的配置,但我想不通。我被卡住了。
% kubectl -n kafka-cloud describe kafkaconnector mysql-test-connector
Name: mysql-test-connector
Namespace: kafka-cloud
Labels: strimzi.io/cluster=my-connect-cluster
Annotations: <none>
API Version: kafka.strimzi.io/v1beta2
Kind: KafkaConnector
Metadata:
Creation Timestamp: 2022-03-02T23:44:20Z
Generation: 1
Managed Fields:
API Version: kafka.strimzi.io/v1beta2
Fields Type: FieldsV1
fieldsV1:
f:metadata:
f:annotations:
.:
f:kubectl.kubernetes.io/last-applied-configuration:
f:labels:
.:
f:strimzi.io/cluster:
f:spec:
.:
f:class:
f:config:
.:
f:database.history.kafka.bootstrap.servers:
f:database.history.kafka.topic:
f:database.hostname:
f:database.password:
f:database.port:
f:database.server.id:
f:database.server.name:
f:database.user:
f:database.whitelist:
f:key.converter:
f:value.converter:
f:tasksMax:
Manager: kubectl-client-side-apply
Operation: Update
Time: 2022-03-02T23:44:20Z
API Version: kafka.strimzi.io/v1beta2
Fields Type: FieldsV1
fieldsV1:
f:status:
.:
f:conditions:
f:observedGeneration:
f:tasksMax:
f:topics:
Manager: okhttp
Operation: Update
Subresource: status
Time: 2022-03-02T23:44:20Z
Resource Version: 3874
UID: c70ffe4e-3777-4524-af82-dad3a57ca25e
Spec:
Class: io.debezium.connector.mysql.MySqlConnector
Config:
database.history.kafka.bootstrap.servers: my-kafka-cluster-kafka-bootstrap:9092
database.history.kafka.topic: schema-changes.sample
database.hostname: 172.17.0.13
database.password:
database.port: 3306
database.server.id: 184054
database.server.name: mysql-pod
database.user:
database.whitelist: sample
key.converter: org.apache.kafka.connect.storage.StringConverter
value.converter: org.apache.kafka.connect.storage.StringConverter
Tasks Max: 1
Status:
Conditions:
Last Transition Time: 2022-03-02T23:45:00.097311Z
Message: PUT /connectors/mysql-test-connector/config returned 400 (Bad Request): Connector configuration is invalid and contains the following 1 error(s):
A value is required
You can also find the above list of errors at the endpoint `/{connectorType}/config/validate`
Reason: ConnectRestException
Status: True
Type: NotReady
Observed Generation: 1
Tasks Max: 1
Topics:
Events: <none>
mySQL连接器所需的配置参数为:
database.allowPublicKeyRetrieval: true
这解决了问题。