如何将 RxJava Observable<Map<String> 的发射<Integer>拆分为多个?



我正试图将一个RxJava.Observable<Map<String, List<Integer>>的排放量拆分为多个RxJava.Observable<Map<String, List<Integer>>。每个发射都应包括原始列表的一个分区,所有分区都按原始顺序排列。保证每个列表具有相同的大小和(list.size() % partitionSize) == 0

示例:

{ 
"a": [0, 1, 2, 3, 4, 5, 6, 7, 8], 
"b": [0, 1, 2, 3, 4, 5, 6, 7, 8] 
}

// First emission
{ 
"a": [0,1,2], 
"b": [0,1,2] 
}
// Second emission
{ 
"a": [3,4,5], 
"b": [3,4,5] 
}
// Third emission
{ 
"a": [6,7,8], 
"b": [6,7,8] 
}

提前感谢!

我会使用streams来完成该工作。一种方法是:

Map<String, List<Integer>> map = Map.of(
"a", List.of(0, 1, 2, 3, 4, 5, 6, 7, 8),
"b", List.of(0, 1, 2, 3, 4, 5, 6, 7, 8));
int size = map.values().stream().mapToInt(list -> list.size()).max().getAsInt();
int partitionSize = IntStream.iterate(2, i -> i + 1)
.limit(size)
.filter(i -> size % i == 0)
.findFirst()
.orElse(1);
int splitSize = Math.round(size / partitionSize);
List<Map<String, List<Integer>>> maps = IntStream.range(0, partitionSize)
.mapToObj(i -> map.entrySet().stream().collect(
Collectors.toMap(Map.Entry::getKey, 
entry -> entry.getValue().subList(i * splitSize, (i + 1) * splitSize))))
.collect(Collectors.toList());
maps.forEach(System.out::println);

输出:

{a=[0, 1, 2], b=[0, 1, 2]}
{a=[3, 4, 5], b=[3, 4, 5]}
{a=[6, 7, 8], b=[6, 7, 8]}

注意:我假设您不知道partitionSize,所有Lists既不是null也不是空的,并且都具有相同的大小和list.size() > 1

您应该为给定参数partitionSize的每个列表buffer(size),然后从键集中的每个元素为每个发出的缓冲列表进行映射。

fromIterable(map.entries).flatMap { entry ->
fromIterable(entry.value).buffer(Math.min(entry.value.size, partionSize))
}.flatMap {
fromIterable(map.keys).flatMap { key -> just(key to it) }
}.subscribe(::println)

输出:

(a, [0, 1, 2])
(b, [0, 1, 2])
(a, [3, 4, 5])
(b, [3, 4, 5])
(a, [6, 7, 8])
(b, [6, 7, 8])
(a, [0, 1, 2])
(b, [0, 1, 2])
(a, [3, 4, 5])
(b, [3, 4, 5])
(a, [6, 7, 8])
(b, [6, 7, 8])

最新更新