如何将Doobie生成的FS2流发布到Kafka



我想发布一个长长的事件列表到Kafka消费一个fs2。流对应于一个非常大的数据库行列表,如果编译成列表,最终会导致内存不足错误。

那么,假设我有一个非常大的UUID键列表,其中包含数百万条记录:

def getKeyStream(timeRangeEnd: LocalDateTime): fs2.Stream[doobie.ConnectionIO, UUID]

,我想发布一个事件到Kafka对应的500个键使用这个Publisher:

trait KeyPublisher {
def publish(event: ChunkOfKeys): IO[Long]
}

我想创建一个函数来队列/发布这个流到Kafka:

def enqueueKeyStreamIntoKafka(endDateTime: LocalDateTime): IO[Unit] = {
getKeyStream(endDateTime)
.chunkN(500)
.evalMap(myChunk => ?????)
...
}

我如何消费由DB发起的流,将其分成恒定大小的块,然后将每个块发布到Kafka中?

显然很难找到关于这个主题的好的文档或示例。你能给我指个正确的方向吗?

既然你没有说ChunkOfKeys是什么类型,我就假设它是Chunk[UUID]之类的

def enqueueKeyStreamIntoKafka(endDateTime: LocalDateTime)(
xa: Transactor[IO],
publisher: KeyPublisher
): IO[Unit] =
getKeyStream(endDateTime)
.transact(xa) // Convert the ConnectionIO stream to Stream[IO, UUID]
.chunkN(500)  // into Stream[IO, Chunk[UUID]]
.evalMap(publisher.publish)  // Into Stream[IO, Long]
.compile
.drain // An IO[Unit] that describes the whole process

相关内容

  • 没有找到相关文章

最新更新