使用Kafka获取CSV数据,然后让多个消费者订阅并创建新的CSV文件,这些文件是原始文件的子集



我在看Robin Moffatt的一个视频(https://rmoff.net/2020/06/17/loading-csv-data-into-kafka/)我相信Apache Kafka可能会帮助我实现工作流程的自动化。

我有一个要求,我需要从客户那里获取CSV,以各种格式(文本或CSV(向两个供应商发送原始信息的子集,从这些供应商那里接收回数据,然后合并所有数据。

我有点像卡夫卡的核心,但我想我会有一个如下的过程:

将客户的数据输入kafka并保存到SQL Server或Postgres数据库中。然后我将发表2";我们有数据";流。每个流基本上都有一行,代表我们从客户那里收到的批次。这些主题流将由kafkaJS消费者使用。使用消息中的信息,这些使用者基本上将根据该供应商所需的输出从数据库中选择数据。

在这个过程的这一点上,我们预计会有两个回应。当每个响应进入(SFTP(时,我们将把响应文件(JSON或CSV(接收到数据库中,就像处理原始客户信息一样。如果我们已经收到所有数据,我们将发布另一条消息,该消息将由合并所有数据的消费者使用。

像罗宾这样的卡夫卡忍者有什么建议吗?非常感谢。

GD-

最具伸缩性的方法可能是创建csv文件的读取流,并在区块读取流上(就像迭代文件的值一样(,通过KafkaJS生成消息。

https://www.digitalocean.com/community/tutorials/how-to-read-and-write-csv-files-in-node-js-using-node-csv这篇文章展示了流媒体。on("数据"(部分是您处理流的地方;操作它,保存到db,生成到Kafka,这些都是有效的。

对于Kafka的正确设置,我会选择一个好的库。对于NodeJS,这是kafka-js>=2.0.0(以下版本存在并发问题(。

通过初始化消费者和生产者,您将在服务中有效地创建一个Kafka基础设施,它可以很好地服务于基于消息传递的微服务架构系统。

我在这里阐述的大部分概念都有很棒的教程,你只需要使用正确的库,并了解流将如何与你的代码逻辑一起工作。我只是提供了一个配方。

希望它能有所帮助!

相关内容

  • 没有找到相关文章

最新更新