Scalaz-stream chunking UP to N



给定如下队列:

val queue: Queue[Int] = async.boundedQueue[Int](1000)

我想拉出这个队列并将其流到下游Sink中,最多100个块。

queue.dequeue.chunk(100).to(downstreamConsumer) 

可以工作,但是如果我有101条消息,它不会清空队列。将会有1条信息剩余,除非另外99条被推入。我想尽可能多地从队列中取出,最多100个,以我的下游进程能处理的最快速度。

是否存在可用的组合子?

为此,您可能需要在退出队列时监视队列的大小。然后,如果大小达到0,您就不会等待更多的元素。实际上,您可以根据队列的大小实现批处理的elastic大小。例如:

val q = async.unboundedQueue[String]
val deq:Process[Task,(String,Int)] = q.dequeue zip q.size
val elasticChunk: Process1[(String,Int), Vector[String]] = ???
val downstreamConsumer : Sink[Task,Vector[String]] = ???
deq.pipe(elasticChunk) to downstreamConsumer

我实际上用了一种不同的方法来解决这个问题。

scalaz-stream队列现在包含一个dequeueBatch方法,允许队列中的所有值脱队,最多N个或块。

https://github.com/scalaz/scalaz-stream/issues/338

相关内容

最新更新