我们使用Kafka作为微服务之间的消息传递系统。我们有一个kafka消费者监听一个特定的主题,然后将数据发布到另一个主题,kafka连接器负责将数据发布到一些数据存储中。
我们使用Apache Avro作为序列化机制。
我们需要启用DLQ来为Kafka Consumer和Kafka Connector添加容错功能。
由于多种原因,任何消息都可能移动到DLQ:
<<ol>对于上面的第3点和第4点,我们想再次尝试来自DLQ的消息。
什么是相同的最佳实践。请建议。
只将导致不可重试错误的记录推送到DLQ,即:示例中的点1(错误格式)和点2(错误数据)。对于DLQ记录的格式,一个好的方法是:
- 向DLQ推送与原始记录完全相同的kafka记录值和密钥,不要将其封装在任何信封中。这使得在故障排除期间使用其他工具(例如使用新版本的反序列化器或其他工具)进行重新处理更加容易。
- 添加一堆Kafka头来通信关于错误的元数据,一些典型的例子是:
- 该记录的原始主题名称、分区、偏移量和Kafka时间戳
- 异常或错误信息
- 处理该记录失败的应用程序的名称和版本
- 错误时间
通常,我对每个服务或应用程序使用一个DLQ主题(不是每个入站主题一个DLQ主题,也不是跨服务共享DLQ主题)。这往往使事情保持独立和易于管理。
哦,您可能希望对DLQ主题的入站流量设置一些监视和警报;)
第3点(高音量)应该处理某种自动缩放,而不是DLQ。尝试总是高估(一点)输入主题的分区数量,因为您可以启动服务的最大实例数量受此限制。过多的消息不会使你的服务过载,因为Kafka的消费者在他们决定的时候会显式地轮询更多的消息,所以他们永远不会要求超过应用程序可以处理的数量。如果消息达到峰值,会发生什么呢?它们会继续堆积在上游kafka主题中。
应该直接从源主题重新尝试点4(连通性),而不涉及任何DLQ,因为错误是暂时的。将消息丢弃到DLQ并捡起下一个消息并不能解决任何问题,因为连通性问题仍然存在,并且下一个消息也可能被丢弃。读取或不读取Kafka的记录不会使它消失,因此存储在那里的记录很容易稍后再次读取。你可以编程你的服务,只有当它成功地将结果记录写入出站主题时,才能向前移动到下一个入站记录(参见Kafka事务:读取主题实际上涉及到写操作,因为新的消费者偏移量需要被持久化,所以你可以告诉你的程序持久化新的偏移量和输出记录作为同一个原子事务的一部分)。
Kafka更像是一个存储系统(只有两个操作:顺序读和顺序写),而不是一个消息队列,它擅长持久化、数据复制、吞吐量、规模……(...和炒作;))。它往往非常适合将数据表示为事件序列,如"事件来源"。如果这种微服务设置的需求主要是异步点对点消息传递,并且如果大多数场景更喜欢超低延迟,并选择丢弃消息而不是重新处理旧消息(正如所列出的4点所建议的那样),那么像Redis队列这样的有损内存队列系统可能更合适?关于在DLQ中重试消息,您可以将Lambda附加到它并将重试逻辑放入其中。不理想,因为我更喜欢原始的答案,但如果你像我一样承受着沉重的负载,我们不能依赖于我们正在调用的web服务,所以很多消息都是暂时的,必须在24小时后重试,等等。
我们还在原始SQS上使用redrive策略,在20分钟后重试最多4次。下面是一个如何根据我们的Terraform脚本进行设置的示例:
resource "aws_sqs_queue" "backoffice_dlq_queue" {
name = "backoffice-dlq-queue.fifo"
fifo_queue = true
content_based_deduplication = true
deduplication_scope = "queue"
receive_wait_time_seconds = 10
message_retention_seconds = 14 * 86400 // 14 days
kms_master_key_id = aws_kms_key.sns_kms_key.id
tags = local.common_tags
}
resource "aws_sqs_queue" "backoffice_queue" {
depends_on = [aws_sqs_queue.backoffice_dlq_queue]
name = "backoffice-queue.fifo"
fifo_queue = true
content_based_deduplication = true
deduplication_scope = "queue"
message_retention_seconds = 172800
visibility_timeout_seconds = 300
receive_wait_time_seconds = 10
kms_master_key_id = aws_kms_key.sns_kms_key.id
tags = local.common_tags
redrive_policy = jsonencode({
deadLetterTargetArn = aws_sqs_queue.backoffice_dlq_queue.arn
maxReceiveCount = 4
})
redrive_allow_policy = jsonencode({
redrivePermission = "byQueue",
sourceQueueArns = ["${aws_sqs_queue.backoffice_dlq_queue.arn}"]
})
}