使用Kafka插入postgres数据库并返回状态消息,用队列处理csv文件



我正在寻找一些最佳实践/建议来处理CSV文件,以便使用队列机制(Kafka(插入数据库

所以我要做的是:

创建一个新的SQL表Service Request来存储用户请求的信息,如:

RequestID, Status, Payload, Response

正如您所看到的,我有字段status来指示请求是成功失败

以下是用户上传CSV文件时的流程:

  1. 用户提交CSV文件
  2. 验证CSV文件以确保它使用了正确的模板
  3. 将CSV文件上传到Google Cloud Storage,然后使用RequestID在Service Request表上创建一个新记录,有效载荷为CSV文件的URL
  4. 读取CSV文件上的所有记录,并将Queue发送到Kafka主题(带有JSON负载(

在消费者方面:

  1. 侦听主题的所有传入队列(使用队列(
  2. 正在处理所有队列
  3. 如果出现错误,请创建CSV文件以存储此队列失败的原因
  4. 如果RequestID XXX的所有队列都已完成,则更新status并使用CSV文件错误列表设置响应

所以问题是
我如何知道RequestID的所有队列XXX全部消耗完毕,我可以更新status吗?

我正在使用:Go+融合kafka-Go

更新

在做了一些研究之后,我发现它应该通过实现GroupByKey来使用Kafka Stream,在Go中可以这样做吗?我无法从合流kafka-go中找到kafka-stream api

我是卡夫卡的新手,所以我可能不是给出建议的最佳人选,但我最初的反应是强制进行消息处理"按顺序";。在生产者方面,你会指出最后一条信息。在消费者端,您会阅读指示器,一旦到达最后一条消息,就会更新Status字段。请记住,强制消息顺序可能会影响系统吞吐量。

有用的阅读资料可在https://medium.com/latentview-data-services/how-to-use-apache-kafka-to-guarantee-message-ordering-ac2d00da6c22

另一种方法是使用Postgres作为分布式锁并跟踪进度。例如,假设您有一个包含列的跟踪表:RequestIdRecordsProcessedRecordsGenerated。您可以锁定行或表,并在每次使用消息时递增RecordsProcessed列。处理完所有记录后,您将相应地更新Status

最新更新