假设我有多个有序的迭代器。如果我想合并这些迭代器,同时使用monix对它们进行全局排序(例如[(1,3,4), (2,4,5)] -> [1,2,3,4,4,5]
),我该怎么做?
不使用Monix,但我不确定这是否与相关
import scala.collection.BufferedIterator
def merge[A:Ordering](xs: Seq[Iterator[A]]) =
new Iterator[A] {
val its = xs.map(_.buffered)
def hasNext = its.exists(_.hasNext)
def next = its.filter{ _.hasNext}
.minBy(_.head)
.next
}
val ys = merge(Seq(List(1,3,5).toIterator, List(2,4,6).toIterator, List(10,11).toIterator))
ys.toList //> res0: List[Int] = List(1, 2, 3, 4, 5, 6, 10, 11)
由于可观察是一个项目流,它可以概括为两种类型:
- 有限的流
- 无限流
请注意,为了正确排序,您需要所有项目。所以,没有简单的方法可以做到这一点。
对于有限流,您必须累积所有项,然后进行排序。您可以使用Observable.fromIterable
将其转化为可观察项。
val items = List((1,3,4), (2,4,5))
val sortedList = Observable
.fromIterable(items)
.flatMap(item => Observable.fromIterable(List(item._1, item._2, item._3)))
.toListL // Flatten to an Observable[Int]
.map(_.sorted)
对于无限流,您唯一能做的就是将项目缓冲到一定的时间或大小。我看不出有什么办法,因为你不知道小溪什么时候会结束。
例如,
val itemsStream: Observable[(Int, Int, Int)] = ???
itemsStream
.bufferIntrospective(10)
.flatMap((itemList: List[(Int, Int, Int)]) => // You'll have to sort this
???
)
有点晚了,但我需要合并排序monix observables,但找不到解决方案,这就是我解决的方法。
其想法是在列表的两个head
上使用bufferWhile
,这样您就可以将实际值放在手边,并检查其中哪个值的较小
private def mergeSortInternal(aHead: Observable[Int], bHead: Observable[Int], aTail: Observable[Int], bTail: Observable[Int]): Observable[Int] = {
(aHead.isEmpty ++ bHead.isEmpty).bufferWhile(_ => true).map {
isEmptyPair: Seq[Boolean] =>
val Seq(aIsEmpty, bIsEmpty) = isEmptyPair
if (aIsEmpty) {
bHead ++ bTail
}
else if (bIsEmpty) {
aHead ++ aTail
}
else {
{
(aHead ++ bHead).bufferWhile(_ => true).map((heads: Seq[Int]) => {
val Seq(aHeadValue, bHeadValue) = heads
if (aHeadValue < bHeadValue) {
Observable(aHeadValue) ++ mergeSortInternal(aTail.head, bHead, aTail.tail, bTail)
}
else {
Observable(bHeadValue) ++ mergeSortInternal(aHead, bTail.head, aTail, bTail.tail)
}
}).flatten
}
}
}.flatten
}
private def mergeSort(a: Observable[Int], b: Observable[Int]) = {
mergeSortInternal(a.head, b.head, a.tail, b.tail)
}