给定如下队列:
val queue: Queue[Int] = async.boundedQueue[Int](1000)
我想拉出这个队列并将其流到下游Sink中,最多100个块。
queue.dequeue.chunk(100).to(downstreamConsumer)
可以工作,但是如果我有101条消息,它不会清空队列。将会有1条信息剩余,除非另外99条被推入。我想尽可能多地从队列中取出,最多100个,以我的下游进程能处理的最快速度。
是否存在可用的组合子?
为此,您可能需要在退出队列时监视队列的大小。然后,如果大小达到0,您就不会等待更多的元素。实际上,您可以根据队列的大小实现批处理的elastic
大小。例如:
val q = async.unboundedQueue[String]
val deq:Process[Task,(String,Int)] = q.dequeue zip q.size
val elasticChunk: Process1[(String,Int), Vector[String]] = ???
val downstreamConsumer : Sink[Task,Vector[String]] = ???
deq.pipe(elasticChunk) to downstreamConsumer
我实际上用了一种不同的方法来解决这个问题。
scalaz-stream队列现在包含一个dequeueBatch
方法,允许队列中的所有值脱队,最多N个或块。