我写了一段代码,以流的形式从rest端点检索数千个String对象。
我的问题是端点以页的形式发送数据。每个页面都有1000个元素。现在我不知道事先的页数。
所以我必须把页面设置为stream ->检查一下,看看是不是空的->如果不是,就把它吃掉。否则停止接收更多的页面,并移动到下一个source
。
我试过findFirst
,findAny
…你说吧。所有这些都是消费者,因此我不能用这些来检查流是否为空,因为使用这些会有效地消耗流。
我也不想发送两次请求->一个是检查流是否有元素。然后再次发送相同的请求并检索流。这将使下载大小加倍。
我想出了一个愚蠢的解决方案,使用peek()
和AtomicBoolean
。有没有更好的方法?
public static void main(String[] args) throws InterruptedException {
AtomicBoolean found = new AtomicBoolean(false);
for (String source : sources) {
// incoming requests are retrieved page by page
int page = 0;
do {
page++;
found.set(false);
try (
Stream<String> stringStream = consumer
// get the feed
.connectToEndpoint(page, source)
// I don't know whether the page would be empty or not when retrieving
// So I peek into the stream and make sure there are elements inside
// So when this stream ends, I don't have to go for the next page
// ===================================================================
// The problem I face is that I have to peek every incoming object
// There is NOT a way to just look into the stream -> Oh its not empty -> Stop receiving more pages
// Even more problematic is the AtomicInteger
// Since it is slow, IMAGINE setting it to true at every incoming object
// ===================================================================
.peek(string -> found.set(true));
// I convert the string stream to some object stream and filter it out
// do further processing
) {
// Then I take the stream, build a json request and write it to file
} catch (IOException e) {
// usually a server error -> see #connectToFeedEndpoint
e.printStackTrace();
page--;
// give some time for server to relax
TimeUnit.SECONDS.sleep(3);
}
System.out.printf("Source = %s, Page = %d, Found = %s n", source, page, found.get());
// This is where I check found
// First page is likely to have elements -> So I'm using a do...while
// At the end of first cycle I check whether found is true -> if it is then I loop again
} while (found.get());
}
}
不行。流是无操作的,直到你要求终端操作"消费"。它。一旦被消耗,你就不能再玩它了。
Java 8 api说明和更高版本在这一点上没有任何改变(强调是我的):
为了执行计算,将流操作组成流管道。流管道由一个源(可能是一个)组成数组,集合,生成器函数,I/O通道等),零或者更多的中间操作(将一个流转换为另一个流)流,如filter(Predicate)),和终端操作(这产生结果或副作用,例如count()或forEach(消费者))。流是惰性的;对源数据进行计算仅在终端操作启动时执行,source元素仅在需要时使用.