我正在寻找一些最佳实践/建议来处理CSV文件,以便使用队列机制(Kafka(插入数据库
所以我要做的是:
创建一个新的SQL表Service Request
来存储用户请求的信息,如:
RequestID, Status, Payload, Response
正如您所看到的,我有字段status
来指示请求是成功或失败
以下是用户上传CSV文件时的流程:
- 用户提交CSV文件
- 验证CSV文件以确保它使用了正确的模板
- 将CSV文件上传到Google Cloud Storage,然后使用RequestID在
Service Request
表上创建一个新记录,有效载荷为CSV文件的URL - 读取CSV文件上的所有记录,并将Queue发送到Kafka主题(带有JSON负载(
在消费者方面:
- 侦听主题的所有传入队列(使用队列(
- 正在处理所有队列
- 如果出现错误,请创建CSV文件以存储此队列失败的原因
- 如果RequestID XXX的所有队列都已完成,则更新
status
并使用CSV文件错误列表设置响应
所以问题是:
我如何知道RequestID的所有队列XXX全部消耗完毕,我可以更新status
吗?
我正在使用:Go+融合kafka-Go库
更新
在做了一些研究之后,我发现它应该通过实现GroupByKey
来使用Kafka Stream,在Go中可以这样做吗?我无法从合流kafka-go中找到kafka-stream api
我是卡夫卡的新手,所以我可能不是给出建议的最佳人选,但我最初的反应是强制进行消息处理"按顺序";。在生产者方面,你会指出最后一条信息。在消费者端,您会阅读指示器,一旦到达最后一条消息,就会更新Status
字段。请记住,强制消息顺序可能会影响系统吞吐量。
有用的阅读资料可在https://medium.com/latentview-data-services/how-to-use-apache-kafka-to-guarantee-message-ordering-ac2d00da6c22
另一种方法是使用Postgres作为分布式锁并跟踪进度。例如,假设您有一个包含列的跟踪表:RequestId
、RecordsProcessed
、RecordsGenerated
。您可以锁定行或表,并在每次使用消息时递增RecordsProcessed
列。处理完所有记录后,您将相应地更新Status
。