在mongodb中停止记录线程并提交结果



有一个Put方法:CCD_ 1。此方法解析文件。分析发生在单独的线程中。因此,对于用户来说,该方法可以立即工作,并返回Mongo数据库中创建的实体的ID。

有一种Post方法:CCD_ 2。此方法应该挂起从POST方法解析文件的线程这能实现吗

我试着这样做:

@Bean(name = "threadPoolTaskExecutor")
public ThreadPoolTaskExecutor executorService() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(2);
executor.setMaxPoolSize(2);
executor.setQueueCapacity(500);
executor.setThreadNamePrefix("codeinside-");
executor.initialize();
return executor;
}
@RequestMapping(value = "/api/v1/fias/{fileName}", method = PUT)
public ResponseEntity<Document> update(@PathVariable(value="fileName") String fileName) throws BadParamException, NotFinishProcessException {
return ResponseEntity.status(HttpStatus.CREATED).body(fiasQueryService.updateFiasByFileName(fileName));
}
@RequestMapping(value = "/api/v1/fias/interrupt/{objectId}", method = POST)
public ResponseEntity<Document> interrupt(@PathVariable(value="objectId") String objectId) {
return ResponseEntity.status(HttpStatus.OK).body(fiasQueryService.interrupt(objectId));
}
@Service
public class FiasQueryServiceImpl implements FiasQueryService {
@Autowired
private AsyncFias asyncFias;
@Autowired
private ThreadPoolTaskExecutor executorService;
private CompletableFuture<Integer> asyncResult;
@Override
public Document updateFiasByFileName(String fileName) throws NotFinishProcessException {
String settingsPath = settingsService.getStringParam(FIAS_FILE_PATH);
File file = new File(settingsPath + "/" + fileName);
ObjectId objectId = checkAndInsertStatus(file.getName().toLowerCase());
asyncResult = asyncFias.startUpdate(file, objectId);
return new Document("_id", objectId.toString()).append("success", true);
}
@Override
public Document interrupt(String objectIdString) {
setStatus(new ObjectId(objectIdString), INTERRUPT);
asyncResult.cancel(true);
Integer cnt = null;
if (asyncResult.isCancelled()) {
ObjectId objectId = new ObjectId(objectIdString);
try {
cnt = asyncResult.get();
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
setStatus(objectId, INTERRUPT);
return new Document("success", true).append("count", cnt);
} else {
return new Document("success", false);
}
}
}
@Service
public class AsyncFias {
@Async("threadPoolTaskExecutor")
@Scope(BeanDefinition.SCOPE_PROTOTYPE)
public CompletableFuture<Integer> startUpdate(File file, ObjectId objectId) {
// a lot of code.......
ClientSession session = mongo.startSession(ClientSessionOptions.builder().causallyConsistent(true).build());
MongoDatabase db = getDb(collectionInfo);
MongoCollection<Document> collection = 
db.getCollection(collectionInfo.getCollectionName());
collection.insertMany(session, dbObjects);
session.close();
// a lot of code.......
return CompletableFuture.completedFuture(count);
}
}

但我在asyncResult.cancel (true);行得到了NPE我还尝试以这种方式停止工作流:executorService.shutdown ();但在这种情况下,流停止时本应记录的记录被回滚。如何停止录制流以便保存当前录制的录制?

我更改了startUpdate方法:

@Service
public class AsyncFias {
private static final Logger log = LoggerFactory.getLogger(AsyncFias.class);
private final FiasFileService fiasFileService;
private final MongoDBService mongoDBService;
private AtomicBoolean inProgress = new AtomicBoolean(false);
private AtomicInteger count = new AtomicInteger(0);
AsyncFias(FiasFileService fiasFileService, MongoDBService mongoDBService) {
this.fiasFileService = fiasFileService;
this.mongoDBService = mongoDBService;
}
public Integer getIncrement(){
return count.get();
}
@Async("threadPoolTaskExecutor")
@Scope(BeanDefinition.SCOPE_PROTOTYPE)
public Future<Void> startUpdate(File file) throws InterruptedException {
DbfUtilEnum utilEnum = DbfUtilEnum.fromFileName(file.getName().toLowerCase());
DbfMapper<Document> objMapper = utilEnum.getDbfMapper();
List<Document> dbObjects = fiasFileService.processFile(file, objMapper);
String collectionName = utilEnum.getCollectionName();
EntryMetaInfo metaInfo = new EntryMetaInfo(collectionName, collectionName, null, false, null);
List<List<Document>> lists = ListUtils.partition(dbObjects, 1000);
if (inProgress.compareAndSet(false, true)) {
for (List<Document> it : lists) {
//Thread.sleep(2000);
if (Thread.currentThread().isInterrupted()) {
System.out.println("Cancelled");
inProgress.set(false);
break;
}
mongoDBService.insertBulk(metaInfo, it);
count.getAndIncrement();
try {
TimeUnit.MILLISECONDS.sleep(1);
} catch (InterruptedException e) {
AsyncResult<Void> result = new AsyncResult<>(null);
result.cancel(true);
return result;
}
}
}
lists.clear();
count.set(0);
AsyncResult<Void> result = new AsyncResult<>(null);
result.cancel(true);
return result;
}
}
@Service
public class FiasQueryServiceImpl implements FiasQueryService {
private Future<Void> asyncResult;
// a lot of code
@Override
public Document updateFiasByFileName(String fileName) throws NotFinishProcessException, InterruptedException {
String settingsPath = settingsService.getStringParam(FIAS_FILE_PATH);
File file = new File(settingsPath + "/" + fileName);
ObjectId objectId = checkAndInsertStatus(file.getName().toLowerCase());
asyncResult = asyncFias.process(file);
return new Document("_id", objectId.toString()).append("success", true);
}
@Override
public Document interrupt(String objectIdString) {
asyncResult.cancel(true);
if (asyncResult.isCancelled()) {
log.info("asyncResult.isCancelled()");
ObjectId objectId = new ObjectId(objectIdString);
setStatus(objectId, INTERRUPT);
return new Document("success", true).append("count", asyncFias.getIncrement());
} else {
return new Document("success", false);
}
}
}

一般结论:对于Future类型的对象,必须调用cancel(true)方法两次:在创建Future时和停止工作线程时。

最新更新