如何使用 kafka connect 将 json 消息从 Kinesis 发送到 MSK,然后再发送到弹性搜索



我已经准备好了,流程也在工作。 我正在使用 lambda 函数将数据从 Kinesis 流发送到 MSK,消息格式如下

{
"data": {
"RequestID":    517082653,
"ContentTypeID":        9,
"OrgID":        16145,
"UserID":       4,
"PromotionStartDateTime":       "2019-12-14T16:06:21Z",
"PromotionEndDateTime": "2019-12-14T16:16:04Z",
"SystemStartDatetime":  "2019-12-14T16:17:45.507000000Z"
},
"metadata":     {
"timestamp":    "2019-12-29T10:37:31.502042Z",
"record-type":  "data",
"operation":    "insert",
"partition-key-type":   "schema-table",
"schema-name":  "dbo",
"table-name":   "TRFSDIQueue"
}
}

我正在发送到 kafka 主题的这条 json 消息如下所示

props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("producer.type", "async");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<String, String>(props);
System.out.println("Inside loop successfully");
try {
producer.send(
new ProducerRecord<String, String>(topicName, new String(rec.getKinesis().getData().array())));
Thread.sleep(1000);
System.out.println("Message sent successfully");
} catch (Exception e) {
System.out.println("------------Exception message=-------------" + e.toString());
}
finally {
producer.flush();
producer.close();
}

当我启动 kafka 连接以进行弹性搜索时,出现类似错误

DataException: Converting byte[] to Kafka Connect data failed due to serialization error

此外,我还修改了快速入门-弹性搜索属性并将键值序列化程序更改为字符串。

当它是json时,它抛出错误。

我可以看到索引是在弹性搜索中使用 kafka 主题名称创建的,但没有记录.

所以请帮助我解决我的一些困惑. 1. 我是否从生产者 kinesis 流正确发送消息? 我正在使用

props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

或者我应该在这里使用 JSON .但是没有这样的json.

  1. 或者我必须在quickstart-elasticsearch.properties中使用 json 序列化程序吗?

  2. 如果事件是插入的,那么它将在弹性搜索中插入记录呢删除和更新,Kafka-connect在弹性搜索中为我们句柄删除和更新呢?

提前致谢

对于 30 天免费试用,您可以使用 Kinesis 源连接器,或者您可以学习如何编写自己的源连接器并将其与 Elasticsearch 接收器一起部署,而不是根本不使用 lambda。


其次,逆向工作。您可以创建一个虚假主题并将相同格式的记录发送到lambda 之外吗?这些最终会落入卡夫卡吗?Elasticsearch怎么样?如果您正在使用 Kibana 并且它不起作用,也可以从等式中删除 Kibana

然后专注于 lambda 集成


回答您的问题

1( 您以字符串形式发送 JSON。你不需要单独的 JSON 序列化程序,除非你要发送映射到序列化程序接口中 JSON 字符串的 POJO 类。

您正在发送JSON记录,因此您应该在Connect中使用JSONConverter,是的。但是,我不认为 Elasticsearch 映射会自动创建,除非您有模式和有效负载,因此简单的解决方法是提前创建 ES 索引映射(但是,如果您已经知道这一点,那么您已经设计了一个模式,因此最终是生产者代码的责任发送正确的记录(。

如果提前定义映射,则应该能够在 Connect 中使用 StringConverter。

关于您的生产者代码,我唯一要更改的是重试次数高于 0。并使用资源尝试,而不是显式关闭生产者

//... parse input 
try (Producer<String, String> producer = new KafkaProducer<>(props)) {
//... send record 
} 

2(您可以搜索连接器的Github问题,但最后我检查了一下,它会进行完整的文档更新和插入,没有部分更新或任何删除

相关内容

  • 没有找到相关文章

最新更新