如何在 Kafka 连接器中正确连接 Elastic Operator 部署的 Elasticsearch?



我在Kafka中有一些CDC数据。现在我正试图从Kafka沉入Elasticsearch。这是我到目前为止所做的:

步骤 1 - 在 Kubernetes 中部署 Elasticsearch(成功)

我在 Kubernetes 中使用 Elastic Operator 按照本教程部署了 Elasticsearch:

  1. 在 Kubernetes 集群中部署 ECK:https://www.elastic.co/guide/en/cloud-on-k8s/current/k8s-deploy-eck.html
  2. 部署 Elasticsearch 集群:https://www.elastic.co/guide/en/cloud-on-k8s/current/k8s-deploy-elasticsearch.html
apiVersion: elasticsearch.k8s.elastic.co/v1
kind: Elasticsearch
metadata:
name: hm-elasticsearch
namespace: elastic
spec:
version: 7.14.0
nodeSets:
- name: default
count: 1
config:
node.store.allow_mmap: false

根据本教程,我可以通过在标头中提供用户名elastic和密码passw0rd来成功调用

curl -u "elastic:passw0rd" -k "https://hm-elasticsearch-es-http.elastic:9200"

返回

{
"name": "hm-elasticsearch-es-default-0",
"cluster_name": "hm-elasticsearch",
"cluster_uuid": "TWgIk0YGR_GVr7IJZcW62g",
"version": {
"number": "7.14.0",
"build_flavor": "default",
"build_type": "docker",
"build_hash": "dd5a0a2acaa2045ff9624f3729fc8a6f40835aa1",
"build_date": "2021-07-29T20:49:32.864135063Z",
"build_snapshot": false,
"lucene_version": "8.9.0",
"minimum_wire_compatibility_version": "6.8.0",
"minimum_index_compatibility_version": "6.0.0-beta1"
},
"tagline": "You Know, for Search"
}

步骤 2 - 添加 ElasticsearchSinkConnector(无法连接 Elasticsearch)

现在我正在尝试添加ElasticsearchSinkConnector,但是,我在设置它时遇到了问题。

我看了 如何 Kafka Connect Elasticsearch with SSL?。弹性云只需要传递用户名和密码即可。所以我认为它会和我的相似。

另外,基于这个 ElasticsearchSinkConnector 配置,我编写了我的配置,然后尝试通过

curl --location --request PUT 'http://hm-connect-cluster-connect-api.kafka:8083/connector-plugins/io.confluent.connect.elasticsearch.ElasticsearchSinkConnector/config/validate' 
--header 'Content-Type: application/json' 
--data-raw '{
"name": "elasticsearch-sink",
"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"tasks.max": "1",
"topics": "opa_db_server.public.roles",
"connection.url": "https://hm-elasticsearch-es-http.elastic:9200",
"connection.username": "elastic",
"connection.password": "passw0rd",
"transforms": "unwrap,key",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.drop.tombstones": "false",
"transforms.key.type": "org.apache.kafka.connect.transforms.ExtractField$Key",
"transforms.key.field": "role_id",
"key.ignore": "false",
"behavior.on.null.values": "delete"
}'

它返回错误

{
"name": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"error_count": 3,
"groups": [
"Common",
"Transforms",
"Predicates",
"Error Handling",
"Transforms: unwrap",
"Transforms: key",
"Connector",
"Data Conversion",
"Proxy",
"Security",
"Kerberos",
"Data Stream"
],
"configs": [
// ...
{
"definition": {
"name": "connection.url",
"type": "LIST",
"required": true,
"default_value": null,
"importance": "HIGH",
"documentation": "The comma-separated list of one or more Elasticsearch URLs, such as ``http://eshost1:9200,http://eshost2:9200`` or ``https://eshost3:9200``. HTTPS is used for all connections if any of the URLs starts with ``https:``. A URL without a protocol is treated as ``http``.",
"group": "Connector",
"width": "LONG",
"display_name": "Connection URLs",
"dependents": [],
"order": 1
},
"value": {
"name": "connection.url",
"value": "https://hm-elasticsearch-es-http.elastic:9200",
"recommended_values": [],
"errors": [
"Could not connect to Elasticsearch. Error message: PKIX path building failed: sun.security.provider.certpath.SunCertPathBuilderException: unable to find valid certification path to requested target"
],
"visible": true
}
},
{
"definition": {
"name": "connection.username",
"type": "STRING",
"required": false,
"default_value": null,
"importance": "MEDIUM",
"documentation": "The username used to authenticate with Elasticsearch. The default is the null, and authentication will only be performed if  both the username and password are non-null.",
"group": "Connector",
"width": "SHORT",
"display_name": "Connection Username",
"dependents": [],
"order": 2
},
"value": {
"name": "connection.username",
"value": "elastic",
"recommended_values": [],
"errors": [
"Could not authenticate the user. Check the 'connection.username' and 'connection.password'. Error message: PKIX path building failed: sun.security.provider.certpath.SunCertPathBuilderException: unable to find valid certification path to requested target"
],
"visible": true
}
},
{
"definition": {
"name": "connection.password",
"type": "PASSWORD",
"required": false,
"default_value": null,
"importance": "MEDIUM",
"documentation": "The password used to authenticate with Elasticsearch. The default is the null, and authentication will only be performed if  both the username and password are non-null.",
"group": "Connector",
"width": "SHORT",
"display_name": "Connection Password",
"dependents": [],
"order": 3
},
"value": {
"name": "connection.password",
"value": "[hidden]",
"recommended_values": [],
"errors": [
"Could not authenticate the user. Check the 'connection.username' and 'connection.password'. Error message: PKIX path building failed: sun.security.provider.certpath.SunCertPathBuilderException: unable to find valid certification path to requested target"
],
"visible": true
}
},
// ...
]
}

可以在此处找到包含所有字段的完整验证响应,其中包含可以使用配置字段的所有信息。

更新 1(2021 年 12 月 8 日):

我发现这个文档有更多关于SSL的配置

"elastic.security.protocol": "SSL"
"elastic.https.ssl.keystore.location": "/path/to/keystore.jks"
"elastic.https.ssl.keystore.password": "xxx"
"elastic.https.ssl.key.password": "xxx"
"elastic.https.ssl.keystore.type": "JKS"
"elastic.https.ssl.truststore.location": "/path/to/truststore.jks"
"elastic.https.ssl.truststore.password": "xxx"
"elastic.https.ssl.truststore.type": "JKS"
"elastic.https.ssl.protocol": "TLS"

就我而言,我只需要提供程序用户名和密码即可成功。但是在上面的配置中,没有地方可以提供用户名。所以我不确定如何真正正确地写它。

更新 2(2021 年 12 月 9 日):

我有这些豆荚在运行

> kubectl get pods --namespace=elastic
NAME                                 READY   STATUS    RESTARTS   AGE
hm-kibana-kb-77d4d9b456-m6th9        1/1     Running   0          6d3h
hm-elasticsearch-es-default-0        1/1     Running   0          35h

而这些秘密

kubectl get secrets --namespace=elastic
NAME                                             TYPE                                  DATA   AGE
hm-kibana-kibana-user                            Opaque                                1      6d3h
elastic-hm-kibana-kibana-user                    Opaque                                3      6d3h
hm-kibana-kb-http-ca-internal                    Opaque                                2      6d3h
hm-elasticsearch-es-http-ca-internal             Opaque                                2      6d3h
hm-elasticsearch-es-http-certs-internal          Opaque                                3      6d3h
hm-elasticsearch-es-http-certs-public            Opaque                                2      6d3h
hm-kibana-kb-es-ca                               Opaque                                2      6d3h
hm-kibana-kb-http-certs-internal                 Opaque                                3      6d3h
hm-kibana-kb-http-certs-public                   Opaque                                2      6d3h
hm-elasticsearch-es-transport-ca-internal        Opaque                                2      6d3h
hm-elasticsearch-es-transport-certs-public       Opaque                                1      6d3h
hm-elasticsearch-es-remote-ca                    Opaque                                1      6d3h
hm-elasticsearch-es-elastic-user                 Opaque                                1      6d3h
hm-elasticsearch-es-internal-users               Opaque                                3      6d3h
hm-elasticsearch-es-xpack-file-realm             Opaque                                3      6d3h
hm-elasticsearch-es-default-es-config            Opaque                                1      6d3h
hm-elasticsearch-es-default-es-transport-certs   Opaque                                3      6d3h
hm-kibana-kb-config                              Opaque                                2      6d3h

我能够保存ca.crttls.crttls.key本地

kubectl get secret hm-elasticsearch-es-http-certs-public 
--namespace=elastic 
--output=go-template='{{index .data "ca.crt" | base64decode }}' 
> ca.crt
kubectl get secret hm-elasticsearch-es-http-certs-public 
--namespace=elastic 
--output=go-template='{{index .data "tls.crt" | base64decode }}' 
> tls.crt
kubectl get secret hm-elasticsearch-es-http-certs-internal 
--namespace=elastic 
--output=go-template='{{index .data "tls.key" | base64decode }}' 
> tls.key

在我必须使用-k标志禁用证书验证以使curl成功之前。现在在 Kubernetes 中,我可以在没有标志的情况下取得成功:

curl --request GET 
--url https://hm-elasticsearch-es-http.elastic:9200 
--cacert ca.crt 
--key tls.key 
--cert tls.crt 
--header 'Content-Type: application/json' 
-u "elastic:passw0rd"

此外,我成功地通过以下方式生成keystore.jks

openssl pkcs12 -export 
-in tls.crt 
-inkey tls.key 
-CAfile ca.crt 
-caname root 
-out keystore.p12 
-password pass:SFLzyT8DPkGGjDtn 
-name hm-elasticsearch-keystore
keytool -importkeystore 
-srckeystore keystore.p12 
-srcstoretype PKCS12 
-srcstorepass SFLzyT8DPkGGjDtn 
-deststorepass MPx57vkACsRWKVap 
-destkeypass MPx57vkACsRWKVap 
-destkeystore keystore.jks 
-alias hm-elasticsearch-keystore

我仍然不确定 UPDATE 1 中的信任库如何。但是现在我可以填写有关密钥库的这一部分:

"elastic.https.ssl.keystore.location": "/path/to/keystore.jks"
"elastic.https.ssl.keystore.password": "MPx57vkACsRWKVap"
"elastic.https.ssl.key.password": "MPx57vkACsRWKVap"
"elastic.https.ssl.keystore.type": "JKS"

但是,我的keystore.jks目前在本地的笔记本电脑上。我的 Elasticsearch 和 ElasticsearchSinkConnector 在 Kubernetes 中运行。

"elastic.https.ssl.keystore.location": "/path/to/keystore.jks"指的是哪里?

首先添加更多背景。我部署 Kafka 的方式是使用 Strimzi:

kubectl create namespace kafka
kubectl apply --filename="https://strimzi.io/install/latest?namespace=kafka" --namespace=kafka
kubectl apply --filename=https://strimzi.io/examples/latest/kafka/kafka-persistent-single.yaml --namespace=kafka

在问题中的更新 2 之后,一旦获得keystore.jks,我就通过以下方式创建了密钥

kubectl create secret generic hm-elasticsearch-keystore 
--from-file=keystore.jks 
--namespace=kafka

我有文件

Kafkaconnect.yaml

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect
metadata:
name: hm-connect-cluster
namespace: kafka
annotations:
# use-connector-resources configures this KafkaConnect
# to use KafkaConnector resources to avoid
# needing to call the Connect REST API directly
strimzi.io/use-connector-resources: "true"
spec:
image: hongbomiao/hm-connect-debezium:latest
replicas: 1
bootstrapServers: my-cluster-kafka-bootstrap:9093
tls:
trustedCertificates:
- secretName: my-cluster-cluster-ca-cert
certificate: ca.crt
config:
config.storage.replication.factor: 1
offset.storage.replication.factor: 1
status.storage.replication.factor: 1
config.providers: file
config.providers.file.class: org.apache.kafka.common.config.provider.FileConfigProvider
externalConfiguration:
volumes:
- name: hm-elasticsearch-keystore-volume
secret:
secretName: hm-elasticsearch-keystore

elasticsearch-sink-kafkaconnector.yaml

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
name: elasticsearch-sink-connector
namespace: kafka
labels:
strimzi.io/cluster: hm-connect-cluster
spec:
class: io.confluent.connect.elasticsearch.ElasticsearchSinkConnector
tasksMax: 1
# https://docs.confluent.io/kafka-connect-elasticsearch/current/configuration_options.html
config:
connector.class: "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector"
tasks.max: "1"
topics: "opa_db_server.public.roles"
connection.url: "https://hm-elasticsearch-es-http.elastic:9200"
connection.username: "elastic"
connection.password: "passw0rd"
transforms: "unwrap,key"
transforms.unwrap.type: "io.debezium.transforms.ExtractNewRecordState"
transforms.unwrap.drop.tombstones: "false"
transforms.key.type: "org.apache.kafka.connect.transforms.ExtractField$Key"
transforms.key.field: "role_id"
key.ignore: "false"
behavior.on.null.values: "delete"
elastic.security.protocol: "SSL"
elastic.https.ssl.keystore.location: "/opt/kafka/external-configuration/hm-elasticsearch-volume/keystore.jks"
elastic.https.ssl.keystore.password: "MPx57vkACsRWKVap"
elastic.https.ssl.key.password: "MPx57vkACsRWKVap"
elastic.https.ssl.keystore.type: "JKS"
elastic.https.ssl.truststore.location: "/opt/kafka/external-configuration/hm-elasticsearch-keystore-volume/keystore.jks"
elastic.https.ssl.truststore.password: "MPx57vkACsRWKVap"
elastic.https.ssl.truststore.type: "JKS"
elastic.https.ssl.protocol: "TLS1.3"

注意:密钥库和信任库共享同一个密钥库。

然后只需要运行

kubectl apply --filename=kafkaconnect.yaml
kubectl apply --filename=elasticsearch-sink-kafkaconnector.yaml

现在它工作了!

经过一些研究,发现了解决这个问题的好方法。

  1. 安装证书管理器 (https://cert-manager.io/docs/installation/helm/)
  2. 通过 cert-manager 生成自签名颁发者和证书(带有额外的 keystores.jks 选项):
apiVersion: cert-manager.io/v1
kind: Issuer
metadata:
name: es-ca-issuer
spec:
selfSigned: {}
---
apiVersion: cert-manager.io/v1
kind: Certificate
metadata:
name: es-jks-cert
spec:
secretName: es-jks-secret
dnsNames:
- <your es internal endpoint service name>
issuerRef:
name: es-ca-issuer
keystores:
jks:
create: true
passwordSecretRef:
key: password
name: jks-secret
---
apiVersion: v1
kind: Secret
metadata:
name: jks-secret
type: Opaque
data:
password: ZXhhbXBsZQ==
  1. 配置您的 Elasticsearch 集群以使用您的自签名证书:
apiVersion: elasticsearch.k8s.elastic.co/v1
kind: Elasticsearch
metadata:
name: es
spec:
version: 8.9.1
http:
tls:
certificate:
secretName: es-jks-secret
nodeSets:
- name: default
count: 1
config:
node.store.allow_mmap: false
  1. 准备您的 Kafka Connect 清单以导入包含 jks 密钥的密钥:
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect
metadata:
name: connect-elastic
labels:
strimzi.io/cluster: <kafka cluster name>
annotations:
strimzi.io/use-connector-resources: "true"
spec:
replicas: 1
bootstrapServers: <kafka-cluster-bootstrap-service>:9093
tls:
trustedCertificates:
- secretName: <kafka-clutser-cert-secret>
certificate: ca.crt
config:
config.providers: secrets
config.providers.secrets.class: io.strimzi.kafka.KubernetesSecretConfigProvider
group.id: elastic-connect
config.storage.topic: elastic-connect-configs
offset.storage.topic: elastic-connect-offsets
status.storage.topic: elastic-connect-status
config.storage.replication.factor: -1
offset.storage.replication.factor: -1
status.storage.replication.factor: -1
build:
output:
type: docker
image: <docker registry>
pushSecret: <docker regcred secret name>
plugins:
- name: kafka-connect-elasticsearch
artifacts:
- type: maven
group: io.confluent
artifact: kafka-connect-elasticsearch
version: 14.0.3
repository: https://packages.confluent.io/maven/
externalConfiguration:
volumes:
- name: es-jks-volume
secret:
secretName: es-jks-secret
  1. 我们使用KubernetesSecretConfigProvider将机密传递给最终连接器。我们应该配置connect-elastic-connect以通过 RBAC 权限访问jks-secretes-es-elastic-user
apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
name: connector-configuration-role
rules:
- apiGroups: [""]
resources: ["secrets"]
resourceNames: ["es-es-elastic-user", "jks-secret"]
verbs: ["get"]
---
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
name: connector-configuration-role-binding
subjects:
- kind: ServiceAccount
name: connect-elastic-connect
- kind: ServiceAccount
name: connect-postgres-connect
roleRef:
kind: Role
name: connector-configuration-role
apiGroup: rbac.authorization.k8s.io
  1. 之后,我们准备部署您的 Kafka Elasticseacrh 连接器:
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
name: connector-elastic
labels:
strimzi.io/cluster: connect-elastic
spec:
class: io.confluent.connect.elasticsearch.ElasticsearchSinkConnector
tasksMax: 1
config:
connector.class: "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector"
topics: "..."
connection.url: https://es-es-internal-http:9200
connection.username: elastic
connection.password: ${secrets:default/es-es-elastic-user:elastic}
elastic.security.protocol: "SSL"
elastic.https.ssl.keystore.location: "/opt/kafka/external-configuration/es-jks-volume/keystore.jks"
elastic.https.ssl.keystore.password: ${secrets:default/jks-secret:password}
elastic.https.ssl.keystore.type: "JKS"
elastic.https.ssl.truststore.location: "/opt/kafka/external-configuration/es-jks-volume/truststore.jks"
elastic.https.ssl.truststore.password: ${secrets:default/jks-secret:password}
elastic.https.ssl.truststore.type: "JKS"

相关内容

最新更新