我正在尝试将AWS MSK连接器设置为DynamoDB,但我不明白如何指定DunamoDB密钥。
connector.class=io.confluent.connect.aws.dynamodb.DynamoDbSinkConnector
table.name.format=test_sink_table_2
confluent.topic.bootstrap.servers="someserver"
tasks.max=1
topics=spk-enriched-stream
aws.dynamodb.endpoint=https://dynamodb.eu-west-3.amazonaws.com
confluent.topic.security.protocol=SASL_SSL
confluent.topic.sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required;
value.converter.schemas.enable=false
value.converter=org.apache.kafka.connect.converters.ByteArrayConverter
key.converter=org.apache.kafka.connect.storage.StringConverter
confluent.topic.sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler
aws.dynamodb.region=eu-west-3
confluent.topic.sasl.mechanism=AWS_MSK_IAM
我的应用程序以TSV格式发送消息给Kafka。
当前我得到这个错误:
[Worker-0e5124fe718e9e914] Caused by: com.amazonaws.services.dynamodbv2.model.AmazonDynamoDBException: The provided key element does not match the schema (Service: AmazonDynamoDBv2; Status Code: 400; Error Code: ValidationException; Request ID: N789DKVS6F25MUQJV9U356DG7BVV4KQNSO5AEMVJF66Q9ASUAAJG; Proxy: null)
那么我如何指定连接器来加载DynamoDB中的数据呢?我如何从value.converter中获得密钥?
当您使用ByteArrayConverter
或StringConverter
时,您将无法访问数据中的字段,因为它们没有结构。示例文档使用AvroConverter,它可以访问字段,因为Avro是结构化的…TSV在Kafka中不是推荐的格式,因为你没有简单的方法来解析"第一列"。作为任何字段/ID使用(您也不知道是否有任何空行)
如果您想使用数据中的字段插入到数据库中,您应该检查aws.dynamodb.pk.hash
和aws.dynamodb.pk.sort
的设置。如文档所述,散列键默认使用分区并按偏移量排序,但是这也需要您更改数据写入主题的方式。
https://docs.confluent.io/kafka-connectors/aws-dynamodb/current/configuration_options.html dynamodb-parameters
所以我们能够找到的唯一选择是将消息从TSV转换为JSON,然后设置PK DynamoDB键。