我的需求是在Kafka集群中创建一个动态资源连接器。下面是我的connector.tf";文件:
resource "confluent_connector" "source" {
environment {
id = confluent_environment.staging.id
}
kafka_cluster {
id = confluent_kafka_cluster.dedicated.id
}
config_sensitive = {
"salesforce.password" : var.source_salesforce_password,
"salesforce.password.token" : var.source_salesforce_password_token,
"salesforce.consumer.key" : var.source_salesforce_consumer_key,
"salesforce.consumer.secret" : var.source_salesforce_consumer_secret
}
config_nonsensitive = {
"connector.class" : "SalesforceCdcSource",
"kafka.auth.mode" : "KAFKA_API_KEY",
"salesforce.cdc.name" : "AccountChangeEvent",
"kafka.api.key" : confluent_api_key.app-manager-kafka-api-key.id,
"kafka.api.secret" : confluent_api_key.app-manager-kafka-api-key.secret,
"salesforce.instance" : var.source_salesforce_url,
"salesforce.username" : var.source_salesforce_username,
for_each = { for s in var.source_salesforce_connector_name : s.source_salesforce_connector_name => s },
"name" : each.value["source_salesforce_connector_name"],
"kafka.topic" : each.value["source_salesforce_topic_name"],
"output.data.format" : each.value["source_salesforce_data_format"],
"tasks.max" : each.value["source_salesforce_max_task"]
}
depends_on = [
confluent_kafka_topic.topic
]
lifecycle {
prevent_destroy = false
}
}
变量声明如下:Variable .tf文件
variable "source_salesforce_connector_name" {
type = list(map(string))
default = [{
"source_salesforce_connector_name" = "SalesforceCdcSourceConnector_0_TF"
}]
}
我从.tfvars运行这个执行文件:
source_salesforce_connector_name = [
{
source_salesforce_connector_name = "SalesforceCdcSourceConnector_1_TF"
source_salesforce_topic_name = "json-topic-1"
source_salesforce_data_format = "JSON"
source_salesforce_max_task = "1"
},
]
执行时出现以下错误,请建议如何传递for_each
我尝试了上面的步骤和执行,但是得到以下错误:
terraform plan -var-file="DEV/DEV.tfvars"
Error: each.value cannot be used in this context
on modulesconfluent_kafka_cluster_dedicatedsource_connector_salesforce_cdc.tf line 27, in resource "confluent_connector" "source":
27: "name" : each.value["source_salesforce_connector_name"],
28: "kafka.topic" : each.value["source_salesforce_topic_name"],
29: "output.data.format" : each.value["source_salesforce_data_format"],
30: "tasks.max" : each.value["source_salesforce_max_task"],*
A reference to "each.value" has been used in a context in which it unavailable, such as when
the configuration no longer contains the value in its "for_each" expression. Remove this
reference to each.value in your configuration to work around this error.
如果您想要基于var.source_salesforce_connector_name
的多个confluent_connector
资源,那么您的for_each
应该在config_nonsensitive
之外:
resource "confluent_connector" "source" {
for_each = { for s in var.source_salesforce_connector_name : s.source_salesforce_connector_name => s },
environment {
id = confluent_environment.staging.id
}
kafka_cluster {
id = confluent_kafka_cluster.dedicated.id
}
config_sensitive = {
"salesforce.password" : var.source_salesforce_password,
"salesforce.password.token" : var.source_salesforce_password_token,
"salesforce.consumer.key" : var.source_salesforce_consumer_key,
"salesforce.consumer.secret" : var.source_salesforce_consumer_secret
}
config_nonsensitive = {
"connector.class" : "SalesforceCdcSource",
"kafka.auth.mode" : "KAFKA_API_KEY",
"salesforce.cdc.name" : "AccountChangeEvent",
"kafka.api.key" : confluent_api_key.app-manager-kafka-api-key.id,
"kafka.api.secret" : confluent_api_key.app-manager-kafka-api-key.secret,
"salesforce.instance" : var.source_salesforce_url,
"salesforce.username" : var.source_salesforce_username,
"name" : each.value["source_salesforce_connector_name"],
"kafka.topic" : each.value["source_salesforce_topic_name"],
"output.data.format" : each.value["source_salesforce_data_format"],
"tasks.max" : each.value["source_salesforce_max_task"]
}
depends_on = [
confluent_kafka_topic.topic
]
lifecycle {
prevent_destroy = false
}
}