如何在反应式编程中使用增量id在数据库中创建条目



我是反应式编程的新手。需要帮助来理解行为。

我想实现什么?

我想创建50个条目,它们的id应该是递增顺序的。如果db中存在id为1的条目,那么它应该创建id为2的条目。

我目前的实现如下:

// create entry 50 times 
void createEntries() {
LOGGER.info("going to create 50 entries);
Flux.range(1, 50)
.flatMap(i -> createEntry(5))
.subscribe();
}
//method to create an entry in db with incremental id  
private Mono<Integer> createEntry(long retryInterval) {
return (customRepository.findAllEntry()) //-->A db call which returns all entries flux<Entrys> 
.map(entry -> entry.getEntryId())
.sort()
//get last existing entry id
.last(0)          
//try to create the entry with new incremented id
.flatMap(id -> createEntry(id + 1, retryInterval));
}
private Mono<? extends Integer> createEntry(int newEntryId, long retryInterval) {
return saveEntry(newEntryId) //--> return Mono<Boolean> true if saved false if id already exists
.doOnNext(applied -> LOGGER.info("Successfully created entry with id: {} ? {} ", newEntryId, applied)) //--> Why this is called multiple times??
.flatMap(applied -> !applied
//applied false shows id already exists, so try again recursively with new incremented id
? createEntry(newEntryId + 1, retryInterval)
: Mono.just(newEntryId))
.doOnError(e -> LOGGER.warn("Error creating entry with id {} ? {} : ", newEntryId, e));
.retryWhen(Retry.anyOf(RuntimeException.class)
.exponentialBackoff(Duration.ofSeconds(retryInterval), Duration.ofSeconds(retryInterval))); //-->retry on creation if any exception 
}

上面的实现给了我意想不到的行为信息记录器"已成功创建id为"的条目被多次调用用于同一id。。然而,我希望它只被调用一次。注意:即使删除retryWhen,行为也保持不变。

最后我在代码中解决了这个问题。问题出现在以下代码段中

// create entry 50 times 
void createEntries() {
LOGGER.info("going to create 50 entries);
Flux.range(1, 50)
.flatMap(i -> createEntry(5))
.subscribe();
}

这是在saveEntry(newEntryId)完成之前的50次调用方法。我使用repeatapi修复了这个问题,如下所示:

// create entry 50 times 
void createEntries() {
LOGGER.info("going to create 50 entries);
createEntry(5).subscribe();
}
//method to create an entry in db with incremental id  
private Flux<? extends Integer> createEntry(long retryInterval) {
return (customRepository.findAllEntry()) //-->A db call which returns all entries flux<Entrys> 
.map(entry -> entry.getEntryId())
.sort()
//get last existing entry id
.last(0)          
//try to create the entry with new incremented id
.flatMap(id -> createEntry(id + 1, retryInterval))
.repeat(49); //-->This fixes my issue will only be invoked  49 times again onComplete(). And hence will create 50 entries
}
private Mono<? extends Integer> createEntry(int newEntryId, long retryInterval) {
return saveEntry(newEntryId) //--> return Mono<Boolean> true if saved false if id already exists
.doOnNext(applied -> LOGGER.info("Successfully created entry with id: {} ? {} ", newEntryId, applied)) 
.flatMap(applied -> !applied
//applied false shows id already exists, so try again recursively with new incremented id
? createEntry(newEntryId + 1, retryInterval)
: Mono.just(newEntryId))
.doOnError(e -> LOGGER.warn("Error creating entry with id {} ? {} : ", newEntryId, e));
.retryWhen(Retry.anyOf(RuntimeException.class)
.exponentialBackoff(Duration.ofSeconds(retryInterval), Duration.ofSeconds(retryInterval))); //-->retry on creation if any exception 
}

最新更新