在运行时将分页包装到流中



下面是一个例子:

有几个服务返回一组项作为页面:

public Page<User> users(int offset, int limit);
public Page<Group> groups(int offset, int limit);

这些服务的消费者实现了在他们这边处理分页的逻辑。它并不复杂,但处理起来仍然容易出错且耗时。

我试着把这个逻辑包装在里面:

public <T> Stream<T> streamOf(final BiFunction<Integer, Integer, PageResource<T>> pageResourceFunction);

pageResourceFunction接受offsetlimit作为输入参数,返回元素页面。

消费者应该得到一个流:

Stream<User> users = streamOf(service::users);

实际用户尚未加载。

  1. 在Java中是可能的吗?构造一个流惰性(从streamOf返回它)与流/元素的数量是未知的提前?
  2. 如果第一个选项不可能,我可以简化一个任务。通常Page包含元素的总数,所以我可以在streamOf中获得第一个页面,确定元素的总数并计算存在多少个页面。但即使这样简化也不能帮助我实现streamOf

您可以使用未知大小的Streams,但是您必须知道到达最后一个元素的时间。这可以通过多种方式由用户完成,但对于本例,我假设如果偏移量超过最后一个元素,用户的pageResourceFunction实现将返回null

Java 9及以上版本

Java 9引入了Stream.takeWhile(Predicate),当一个元素与Predicate不匹配时,它将使Stream短路。在这个解决方案中,我们希望从pageResourceFunction中获取页面,而不返回null。我们将takeWhile应用于offsets的无限Stream,生成所需的页面:

private static final int LIMIT = 4;
public static <T> Stream<T> streamOf(final BiFunction<Integer, Integer, Page<T>> pageResourceFunction) {
return IntStream.iterate(0, i -> i + LIMIT)  // Creates an infinite Stream with elements 0, 4, 8, 12, ...
.mapToObj(offset -> pageResourceFunction.apply(offset, LIMIT))
.takeWhile(Objects::nonNull)
.flatMap(page -> page.getElements().stream());
}

Java 8

在Java 8中,我们必须制作自己的IteratorSpliterator,当pageResourceFunction不返回null时,它接受元素,然后使用StreamSupport将其包装成Stream。在Java 8中复制Stream.takeWhile之前已经在下面的问题中得到了回答,但我还将提供我的实现:

  • 通过谓词
  • 限制流
  • 使用Java 8 Lambdas从列表中选取元素,直到满足条件为止

我将在本例中使用Spliterator,因为它为Stream提供了更多关于如何使用Spliterator.characteristics()等方法优化其执行的提示:Spliterator将从pageResourceFunction向前推进Page,直到pageResourceFunction返回null

最后,使用StreamSupport.stream(Spliterator, boolean)Spliterator包装成Stream。流总是惰性求值,只有在调用终端操作(例如Stream.collect())时才完全求值。

import lombok.Value;
import org.assertj.core.api.Assertions;
import org.junit.Test;
import java.util.Collection;
import java.util.List;
import java.util.Spliterator;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
public class PageProviderJava8 {
private static final int LIMIT = 4;
public static <T> Stream<T> streamOf(final BiFunction<Integer, Integer, Page<T>> pageResourceFunction) {
class PageSpliterator implements Spliterator<Page<T>> {
private int offset = 0;
@Override
public boolean tryAdvance(Consumer<? super Page<T>> action) {
Page<T> page = pageResourceFunction.apply(offset, LIMIT);
if (page == null) {
return false; // Last page is reached, stop the Spliterator
}
offset += LIMIT;
action.accept(page);
return true;
}
@Override
public Spliterator<Page<T>> trySplit() {
return null; // Should return non-null values if you want to use parallel streams
}
@Override
public long estimateSize() {
return Long.MAX_VALUE; // The size is unknown, so we return Long.MAX_VALUE
}
/*
* Must be ordered, otherwise elements are requested from multiple pages,
* which this Spliterator does not support.
* Other characteristics depend on the implementation of Page.elements
*/
@Override
public int characteristics() {
return ORDERED;
}
}
return StreamSupport.stream(new PageSpliterator(), false)
.flatMap(page -> page.getElements().stream());
}
// POJOs
@Value
public static class Page<T> {
Collection<T> elements;
}
@Value
public static class User {
int id;
}
public static class PageProviderJava8Test {
// Creates a Page of Users with ids offset to offset + limit. Will return null after 20 Users.
public static Page<User> users(int offset, int limit) {
if (offset >= 20) {
return null;
}
Collection<User> elements = IntStream.range(offset, offset + limit)
.mapToObj(User::new)
.collect(Collectors.toList());
return new Page<>(elements);
}
@Test
public void testPages() {
List<User> users = PageProviderJava8.streamOf(PageProviderJava8Test::users)
.collect(Collectors.toList());
Assertions.assertThat(users)
.hasSize(20);
Assertions.assertThat(users.get(0).getId()).isEqualTo(0);
Assertions.assertThat(users.get(19).getId()).isEqualTo(19);
}
}
}

这是选项#2的解决方案,对于第一个选项,无法找到一个。

希望对别人有帮助。

private static final int LIMIT = 5;
public <T> Stream<T> streamOf(final BiFunction<Integer, Integer, PageResource<T>> pageResourceFunction) {
PageResource<T> firstPage = pageResourceFunction.apply(0, LIMIT);
return Stream.concat(
firstPage.getContent().stream(),
IntStream.range(1, numberOfPages(firstPage.getTotalElements()))
.mapToObj(numberOfPage ->
pageResourceFunction.apply(
numberOfPage * LIMIT, LIMIT))
.map(PageResource::items)
.flatMap(List::stream));
}
private int numberOfPages(final long total) {
return (total % LIMIT == 0)
? (int) total / LIMIT
: (int) total / LIMIT + 1;
}

假设原始页面资源函数接受附加参数:

PageResource<Node> subNodes(
final int parentId,
final int someAdditionalId,
final int offset,
final int limit);

消费者:

Stream<Node> stream = streamOf((offset, limit) -> 
service.subNodes(111, 222, offset, limit));

最新更新