通过 spring-data 迭代 MongoDB 中的大型集合



朋友们!

我正在通过spring-data在java项目中使用MongoDB。我使用存储库接口来访问集合中的数据。对于某些处理,我需要遍历集合的所有元素。我可以使用存储库的 fetchAll 方法,但它总是返回 ArrayList。

但是,假设其中一个集合会很大 - 最多 100 万条记录,每个记录至少几千字节。我想我不应该在这种情况下使用 fetchAll,但我既找不到返回一些迭代器的方便方法(这可能允许部分获取集合),也找不到带有回调的方便方法。

我只看到支持在页面中检索此类集合。我想知道这是否是处理此类收藏的唯一方法?

回复较晚,但也许将来会帮助某人。Spring data 不提供任何 API 来包装 Mongo DB Cursor 功能。它find方法中使用它,但始终返回已完成的对象列表。选项是直接使用 Mongo API 或使用 Spring 数据分页 API,如下所示:

        final int pageLimit = 300;
        int pageNumber = 0;
        Page<T> page = repository.findAll(new PageRequest(pageNumber, pageLimit));
        while (page.hasNextPage()) {
            processPageContent(page.getContent());
            page = repository.findAll(new PageRequest(++pageNumber, pageLimit));
        }
        // process last page
        processPageContent(page.getContent());

UPD (!这种方法不足以处理大型数据集(请参阅布什评论@Shawn)在这种情况下,请直接使用 Mongo API。

由于这个问题最近被撞了,这个答案需要更多的爱!

如果使用 Spring 数据存储库接口,则可以声明一个返回 Stream 的自定义方法,它将由 Spring Data 使用游标实现:

import java.util.Stream;
public interface AlarmRepository extends CrudRepository<Alarm, String> {
    Stream<Alarm> findAllBy();
}

因此,对于大量数据,您可以流式传输它们并逐行处理,而不受内存限制。

见 https://docs.spring.io/spring-data/mongodb/docs/current/reference/html/#mongodb.repositories.queries

你仍然可以使用 mongoTemplate 来访问集合,只需使用 DBCursor:

     DBCollection collection = mongoTemplate.getCollection("boundary");
     DBCursor cursor = collection.find();        
     while(cursor.hasNext()){
         DBObject obj = cursor.next();
         Object object =  obj.get("polygons");
         ..
      ...
     }

使用 MongoTemplate::stream() 作为 DBCursor 最合适的 Java 包装器

另一种方式:

do{
  page = repository.findAll(new PageRequest(pageNumber, pageLimit));
  pageNumber++;
}while (!page.isLastPage());

检查新方法以处理每个文档的结果。

http://docs.spring.io/spring-data/mongodb/docs/current/api/org/springframework/data/mongodb/core/MongoTemplate.html#executeQuery-org.springframework.data.mongodb.core.query.Query-java.lang.String-org.springframework.data.mongodb.core.DocumentCallbackHandler-

您可能想尝试像这样的DBCursor方式:

    DBObject query = new BasicDBObject(); //setup the query criteria
    query.put("method", method);
    query.put("ctime", (new BasicDBObject("$gte", bTime)).append("$lt", eTime));
    logger.debug("query: {}", query);
    DBObject fields = new BasicDBObject(); //only get the needed fields.
    fields.put("_id", 0);
    fields.put("uId", 1);
    fields.put("ctime", 1);
    DBCursor dbCursor = mongoTemplate.getCollection("collectionName").find(query, fields);
    while (dbCursor.hasNext()){
        DBObject object = dbCursor.next();
        logger.debug("object: {}", object);
        //do something.
    }
迭代

大型集合的最佳方法是直接使用 Mongo API。我使用了下面的代码,它就像我的用例的魅力一样。
我不得不迭代超过 15M 条记录,其中一些记录的文档大小很大。
以下代码位于 Kotlin Spring Boot App (Spring Boot 版本:2.4.5)

fun getAbcCursor(batchSize: Int, from: Long?, to: Long?): MongoCursor<Document> {
    val collection = xyzMongoTemplate.getCollection("abc")
    val query = Document("field1", "value1")
    if (from != null) {
        val fromDate = Date(from)
        val toDate = if (to != null) { Date(to) } else { Date() }
        query.append(
            "createTime",
            Document(
                "$gte", fromDate
            ).append(
                "$lte", toDate
            )
        )
    }
    return collection.find(query).batchSize(batchSize).iterator()
}

然后,从服务层方法中,您可以继续在返回的游标上调用MongoCursor.next(),直到MongoCursor.hasNext()返回true。

一个重要的观察:请不要错过在"FindIterable"(MongoCollection.find()的返回类型)上添加batchSize。如果不提供批大小,光标将获取初始 101 条记录,之后将挂起(它会尝试一次读取所有剩余记录)。
对于我的方案,我使用批大小为 2000,因为它在测试期间提供了最佳结果。此优化的批大小将受记录的平均大小的影响。

这是 Java 中的等效代码(从查询中删除 createTime,因为它特定于我的数据模型)。

    MongoCursor<Document> getAbcCursor(Int batchSize) {
        MongoCollection<Document> collection = xyzMongoTemplate.getCollection("your_collection_name");
        Document query = new Document("field1", "value1");// query --> {"field1": "value1"}
        return collection.find(query).batchSize(batchSize).iterator();
    }

这个答案基于: https://stackoverflow.com/a/22711715/5622596

这个答案需要一点更新,因为PageRequest已经改变了它的构建方式。

话虽如此,这是我修改后的回复:

int pageNumber = 1;
//Change value to whatever size you want the page to have
int pageLimit = 100;
Page<SomeClass> page;
List<SomeClass> compondList= new LinkedList<>();
do{
    PageRequest pageRequest = PageRequest.of(pageNumber, pageLimit);
    
    page = repository.findAll(pageRequest);
    
    List<SomeClass> listFromPage = page.getContent();
    //Do something with this list example below
    compondList.addAll(listFromPage);
    pageNumber++;
  }while (!page.isLast());
//Do something with the compondList: example below
return compondList;
Du到

Mongo DB游标功能,如果你有很长的进程,你可能会丢失游标...

我建议使用分页:

        final int pageSize = 1000;
        var paging = Pageable.ofSize(pageSize);
        do {
            Page<T> page = repository.findAll(paging); // Retrieve page items
            page.forEach((item) -> this.processItem(item);); // Do item job
            
            // page++
            paging = page.nextPageable(); // If last: return Pageable.unpaged()
        }
        while (paging.isPaged()); // If last: Unpaged.isPaged() return false

对于存储库,有 2 个选项:

    // Use Spring Data Interface
    @Repository
    public interface YourDao extends PagingAndSortingRepository<T, ID> {
        // extends create this impl
        // Page<T> findAll(Pageable pageable);
    }
    // Or create your own Impl
    public class YourDaoImpl implements YourDao {
        @Override
        public Page<T> findAll(Pageable pageable) {
            final var query = new Query().with(pageable);
            var items = mongoTemplate.find(query, T.class);
            return PageableExecutionUtils.getPage(
                  items, 
                  pageable, 
                  () -> mongoTemplate.count(Query.of(query).limit(-1).skip(-1), T.class));
        }
    }

最新更新