将文件上传到Kafka并进一步处理



将上传文件的二进制数据发送给Kafka,然后由一些连接到Kafka主题的服务分发处理上传,这是一种好方法吗?

我看到了一些优势:

  • 过滤上传数据
  • 复制副本
  • 有些服务可以处理上传,而不仅仅是一个

你对此怎么看?

上传文件的二进制数据发送到Kafka,然后发送到分布式处理连接到的某些服务的上载卡夫卡主题

通常文件被上传到文件系统,它们的URI存储在Kafka消息中。这是为了确保Kafka消息的大小相对较小,从而提高其客户端的吞吐量

在这种情况下,如果我们在Kafka消息中放入大对象,则消费者将不得不读取整个文件。因此,您的poll()将比平时花费更长的时间。

另一方面,如果我们只放一个文件的URI,而不是文件本身,那么消息消耗将相对较快,并且您可以通过增加应用程序吞吐量,将文件处理委托给另一个线程(可能来自线程池(。

副本

正如Kafka中有副本一样,文件系统也可以有副本。甚至kafka也将消息存储在文件系统中(作为分段文件(。所以,复制也可以用文件系统本身来完成。

最好的方法是在Kafka中放置一个指向文件的URI消息,然后为该URI放置一个处理程序负责向您提供文件,并可能在原始文件被删除的情况下向您提供副本

处理程序可能与系统的其他部分松散耦合,专门用于管理文件、维护副本等。

过滤上传数据

只有当您实际读取文件内容时,才能对上传的数据进行过滤。您甚至可以将文件的URI放在消息中并从中读取。例如,如果您使用Kafka流,您可以将过滤逻辑放在transform()mapValues()等中

stream.from(topic)
.mapValues(v -> v.getFileURI())
.filter((k,fileURI) -> validate(read(fileURI)))
.to(..)

命中分段.bytes

在邮件中存储文件的另一个缺点是,如果文件较大,则可能会达到segment.bytes限制。每次都需要不断更改segment.bytes,以满足新的文件大小要求。

另一点是,如果segment.bytes设置为1GB,并且您的第一条消息(文件(大小为750MB,而您的下一条消息为251 MB,则251MB消息无法容纳在第一个段中,因此您的第一个段将只有一条消息,尽管它尚未达到限制。这意味着每个段存储的消息数量相对较少。

最新更新