数据异常:键必须是结构或映射



我正在尝试使用 KSQLdb 使用 Kafka Connect 配置ConfluentCassandra Sink 连接器

CREATE SINK CONNECTOR cassandra WITH(
"name" = 'CASSANDRA',
"connector.class" = 'io.confluent.connect.cassandra.CassandraSinkConnector',
"tasks.max" = '1',
"topics" = 'users',
"cassandra.contact.points" = 'cassandra',
"cassandra.keyspace" = 'test',
"confluent.topic.bootstrap.servers" = 'kafka:29092',
"confluent.topic.replication.factor" = '1',
"key.converter" = 'org.apache.kafka.connect.storage.StringConverter',
"value.converter" = 'org.apache.kafka.connect.json.JsonConverter',
"transforms" = 'createKey,extractId',
"transforms.createKey.type" = 'org.apache.kafka.connect.transforms.ValueToKey',
"transforms.createKey.fields" = 'ID',
"transforms.extractId.type" = 'org.apache.kafka.connect.transforms.ExtractField$Key',
"transforms.extractId.field" = 'ID');

用户主题是一个 KSQLdb 表。

打印主题;结果是:

密钥格式:KAFKA_STRING

值格式:JSON 或 KAFKA_STRING

划行时间:2020/05/19 10:51:35.036 Z,键:P343434,值:{"ID":"P343434"}

例外:

Caused by: org.apache.kafka.connect.errors.DataException: Key must be a struct or map. This connector requires that records from Kafka contain the keys for the Cassandra table. Please use a transformation like org.apache.kafka.connect.transforms.ValueToKey to create a key with the proper fields.

异常提到键必须是结构或映射!! 我做了转换以给它一把钥匙,但问题仍然存在!

有没有办法对此进行故障排除或了解此 Cassandra 连接器所需的键/值格式

Confluent的Cassandra连接器有以下限制(从我的角度来看非常大(:

  1. 主题的键必须与 Cassandra 表的主键直接匹配。 所以在你的情况下,你的Cassandra表应该有主键,由一列类型为text。如果你有复合主键,那么你需要将你的主题转换为另一个主题,结构或映射与 Cassandra 中的主键匹配
  2. 主题的值应与表的"常规"列匹配(除主键之外的所有列(

这种限制导致中间主题等的扩散。

更灵活的解决方案可能是使用DataStax的Kafka Sink Connector:

  1. 它没有这样的限制 - 您可以定义如何将主题映射到表的字段中
  2. 它重量轻,性能高
  3. 与DSE和Cassandra合作
  4. 支持从单个主题写入多个表,无需创建中间主题(如Confluent版本要求的那样(

相关内容

  • 没有找到相关文章

最新更新