优化Spring Boot中的数据提取和插入



我在CSV文件中有270000条记录,列为user_id、book_ISBN和book_rating,我需要将这些记录插入到一个多对多表中。我用openCSV库解析了数据,结果是一个列表。

public List<UserRatingDto> uploadRatings(MultipartFile file) throws IOException{
BufferedReader fileReader = new BufferedReader(new
InputStreamReader(file.getInputStream(), "UTF-8"));
List<UserRatingDto> ratings = new CsvToBeanBuilder<UserRatingDto>(fileReader)
.withType(UserRatingDto.class)
.withSeparator(';')
.withIgnoreEmptyLine(true)
.withSkipLines(1)
.build()
.parse();
return ratings;
}

没有性能问题,解析大约需要1分钟。然而,为了将这些插入到表中,我需要从DB中获取书籍和用户以形成关系,我尝试使用@Async注释使方法async,我尝试并行流,我尝试将对象放入堆栈并使用saveAll()来批量插入,但它仍然需要太多时间。

public void saveRatings(final MultipartFile file) throws IOException{
List<UserRatingDto> userRatingDtos = uploadRatings(file);
userRatingDtos.parallelStream().forEach(bookRating->{
UserEntity user = userRepository.findByUserId(bookRating.getUserId());
bookRepository.findByISBN(bookRating.getBookISBN()).ifPresent(book -> {
BookRating bookRating1 = new BookRating();
bookRating1.setRating(bookRating.getBookRating());
bookRating1.setUser(user);
bookRating1.setBook(book);
book.getRatings().add(bookRating1);
user.getRatings().add(bookRating1);
bookRatingRepository.save(bookRating1);
});
});
}

这是我现在拥有的,有什么我可以改变使它更快吗?

问题是数据被一个接一个地获取和持久化。访问数据最高效的方式通常是well defined batches,然后遵循以下模式:

  • 获取处理批处理所需的数据
  • 处理内存中的批处理
  • 在获取下一批之前持久化处理结果

对于您的特定用例,您可以这样做:

public void saveRatings(final MultipartFile file) throws IOException {
List<UserRatingDto> userRatingDtos = uploadRatings(file);
// Split the list into batches
getBatches(userRatingDtos, 100).forEach(this::processBatch);
}
private void processBatch(List<UserRatingDto> userRatingBatch) {

// Retrieve all data required to process a batch
Map<String, UserEntity> users = userRepository
.findAllById(userRatingBatch.stream().map(UserRatingDto::getUserId).toList())
.stream()
.collect(toMap(UserEntity::getId, user -> user));
Map<String, Book> books = bookRepository.findAllByIsbn(userRatingBatch.stream().map(UserRatingDto::getBookISBN).toList())
.stream()
.collect(toMap(Book::getIsbn, book -> book));
// Process each rating in memory
List<BookRating> ratingsToSave = userRatingBatch.stream().map(bookRatingDto -> {
Book book = books.get(bookRatingDto.getBookISBN());
if (book == null) {
return null;
}
UserEntity user = users.get(bookRatingDto.getUserId());
BookRating bookRating = new BookRating();
bookRating.setRating(bookRatingDto.getBookRating());
bookRating.setUser(user);
bookRating.setBook(book);
book.getRatings().add(bookRating);
user.getRatings().add(bookRating);
return bookRating;
}).filter(Objects::nonNull).toList();
// Save data in batches
bookRatingRepository.saveAll(ratingsToSave);
bookRepository.saveAll(books.values());
userRepository.saveAll(users.values());
}
public <T> List<List<T>> getBatches(List<T> collection, int batchSize) {
List<List<T>> batches = new ArrayList<>();
for (int i = 0; i < collection.size(); i += batchSize) {
batches.add(collection.subList(i, Math.min(i + batchSize, collection.size())));
}
return batches;
}

注意,所有的I/O应该总是分批完成。如果你在内部处理循环中只有一个DB查找或保存,这将根本不起作用。

您可以尝试不同的batch sizes,看看什么能带来更好的性能——批处理越大,事务保持打开的时间就越长,并不是总是更大的批处理会带来更好的整体性能。

同时,确保您优雅地处理错误—例如:

  • 如果一个批处理抛出错误,您可以将这样的批处理分成两个,以此类推,直到只有一个评级失败。
  • 你也可以用backoff重试失败的批处理,例如,如果有数据库访问问题。
  • 您可以丢弃评级,例如,如果您有一个空的必填字段

编辑:根据OP的评论,这提高了10倍以上的性能。此外,如果排序不重要,通过并行处理每个批处理仍然可以大大提高性能。

EDIT 2:作为一般模式,理想情况下,我们一开始不会把所有的记录都放在内存中,而是检索数据并分批处理。这将进一步提高性能并避免OOM错误。

同样,这可以在许多并发模式中完成,例如使用专用线程来获取数据,工作线程来处理数据,以及另一组线程来持久化结果。

最简单的模式是让每个工作单元都是独立的——他们被赋予他们应该处理的(例如,从DB中获取的一组id),然后检索必要的数据进行处理,在内存中处理,并持久化结果。

为什么不使用这样的临时staging表(可能使用NOLOGGING和其他优化,如果可用):

CREATE TEMPORARY TABLE load_book_rating (
user_id BIGINT,
book_isbn TEXT,
rating TEXT
);

然后批量加载CSV数据到staging表中,然后批量将所有数据插入到真实表中,如下所示:

INSERT INTO book_rating (user_id, book_id, book_rating)
SELECT l.user_id, b.id, l.book_rating
FROM load_book_rating AS l
JOIN book AS b ON l.book_isbn = b.isbn

我可能忽略了模式中的一些细节,但我在这里的主要观点是,您可能只是因为ISBN自然键而没有将其用作BOOK表的主键,所以您必须执行查找?

或者,使用RDBMS的本地CSV导入功能。它们中的大多数都可以这样做,例如PostgreSQL的COPY命令

我很确定一个纯粹的基于SQL的方法会比你在Java中实现的任何其他方法都要好。

最新更新