我想发布一个长长的事件列表到Kafka消费一个fs2。流对应于一个非常大的数据库行列表,如果编译成列表,最终会导致内存不足错误。
那么,假设我有一个非常大的UUID键列表,其中包含数百万条记录:
def getKeyStream(timeRangeEnd: LocalDateTime): fs2.Stream[doobie.ConnectionIO, UUID]
,我想发布一个事件到Kafka对应的块500个键使用这个Publisher:
trait KeyPublisher {
def publish(event: ChunkOfKeys): IO[Long]
}
我想创建一个函数来队列/发布这个流到Kafka:
def enqueueKeyStreamIntoKafka(endDateTime: LocalDateTime): IO[Unit] = {
getKeyStream(endDateTime)
.chunkN(500)
.evalMap(myChunk => ?????)
...
}
我如何消费由DB发起的流,将其分成恒定大小的块,然后将每个块发布到Kafka中?
显然很难找到关于这个主题的好的文档或示例。你能给我指个正确的方向吗?
既然你没有说ChunkOfKeys
是什么类型,我就假设它是Chunk[UUID]
之类的
def enqueueKeyStreamIntoKafka(endDateTime: LocalDateTime)(
xa: Transactor[IO],
publisher: KeyPublisher
): IO[Unit] =
getKeyStream(endDateTime)
.transact(xa) // Convert the ConnectionIO stream to Stream[IO, UUID]
.chunkN(500) // into Stream[IO, Chunk[UUID]]
.evalMap(publisher.publish) // Into Stream[IO, Long]
.compile
.drain // An IO[Unit] that describes the whole process