我有一个长时间运行的任务,结果生成常规文件和一个列出其他文件的主文件。
调度程序每天通过cron重新生成一次这些文件。
任务流使用rx-java
实现。
问题是,如果一个请求进入并启动任务,或者任务由调度程序运行,然后当任务正在进行时,一些其他请求来了,而不是等待任务完成,而是触发另一个执行。
那么问题是如何在任务执行时同步,使它只执行一次?
下面是示例代码:@Service
public class FileService {
@Autowired FileRepository fileRepository;
@Autowired List<Pipeline> pipelines;
public Observable<File> getMainFile() {
if (fileRepository.isMainFileExists())
return Observable.just(fileRepository.getMainFile());
else
return generate(() -> fileRepository.getMainFile());
}
public Observable<File> getFile(String fileName) {
if (fileRepository.isMainFileExists())
return Observable.just(fileRepository.getFile(fileName));
else
return generate(() -> fileRepository.getFile(fileName));
}
Observable<File> generate(Func0<File> whenGenerated) {
return Observable.from(pipelines)
// other business logic goes here
// after task execution finished just get needed file
.map(isAllPipelinesSuccessful -> {
return whenGenerated.call();
});
}
@Scheduled(cron = "0 0 4 * * ?")
void scheduleGeneration() {
generate(() -> fileRepository.getMainFile()).subscribe();
}
}
从控制器调用,示例代码如下:
@RestController
public class FileController {
private static final Long TIMEOUT = 1_000 * 60 * 10L; //ten mins
@Autowired FileService fileService;
@RequestMapping(value = "/mainfile", produces = "application/xml")
public DeferredResult<ResponseEntity<InputStreamResource>> getMainFile() {
DeferredResult<ResponseEntity<InputStreamResource>> deferredResult = new DeferredResult<>(TIMEOUT);
Observable<File> observableMainFile = fileService.getMainFile();
observableMainFile
.map(this::fileToInputStreamResource)
.map(resource -> ResponseEntity.ok().cacheControl(CacheControl.maxAge(1, TimeUnit.HOURS).cachePublic()).body(resource))
.subscribe(deferredResult::setResult, ex -> {
deferredResult.setErrorResult(ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(null));
});
return deferredResult;
}
@RequestMapping(value = "/files/{filename:.+}", produces = "application/xml")
public DeferredResult<ResponseEntity<InputStreamResource>> getFile(@PathVariable("filename") String filename) {
DeferredResult<ResponseEntity<InputStreamResource>> deferredResult = new DeferredResult<>(TIMEOUT);
Observable<File> observableFile = fileService.getFile(filename);
observableFile
.map(this::fileToInputStreamResource)
.map(resource -> ResponseEntity.ok().cacheControl(CacheControl.maxAge(1, TimeUnit.HOURS).cachePublic()).body(resource))
.subscribe(deferredResult::setResult, ex -> {
boolean isFileNotFound = FileNotFoundException.class.isInstance(ex.getCause());
HttpStatus status = isFileNotFound ? HttpStatus.NOT_FOUND : HttpStatus.INTERNAL_SERVER_ERROR;
deferredResult.setErrorResult(ResponseEntity.status(status).body(null));
});
return deferredResult;
}
}
我有类似以下的东西,但我认为有更好的解决方案。我使用的是RxJava2-RC5。
- 答案缺少检查,该任务已执行。https://gist.github.com/anonymous/7b4717cea7ddce270a2e39850a3bd2a4
更新::
interface FileRepository {
String getFile();
Boolean isMainFileExists();
}
private static Scheduler executorService = Schedulers.from(Executors.newFixedThreadPool(1));
@org.junit.Test
public void schedulerTest123() throws Exception {
FileRepository fRepo = mock(FileRepository.class);
when(fRepo.getFile()).thenReturn("");
when(fRepo.isMainFileExists()).thenReturn(false);
Thread t1 = new Thread(() -> {
getFile(fRepo, executorService).subscribe();
});
Thread t2 = new Thread(() -> {
getFile(fRepo, executorService).subscribe();
});
t1.start();
t2.start();
Thread.sleep(3_000);
when(fRepo.getFile()).thenReturn("DasFile");
when(fRepo.isMainFileExists()).thenReturn(true);
Thread t3 = new Thread(() -> {
getFile(fRepo, executorService).subscribe();
});
t3.start();
Thread.sleep(5_000);
}
private Observable<String> getFile(FileRepository fileRepo, Scheduler scheduler) {
return Observable.defer(() -> {
try {
if (fileRepo.isMainFileExists()) {
return Observable.fromCallable(fileRepo::getFile)
.subscribeOn(Schedulers.io())
.doOnNext(s -> printCurrentThread("Get File from Repo"));
} else {
return startLongProcess().doOnNext(s -> printCurrentThread("Push long processValue"));
}
} catch (Exception ex) {
return Observable.error(ex);
}
}).subscribeOn(scheduler).doOnSubscribe(disposable -> printCurrentThread("SUB"));
}
private Observable<String> startLongProcess() {
return Observable.fromCallable(() -> {
printCurrentThread("Doing LongProcess");
Thread.sleep(5_000);
return "leFile";
});
}
private void printCurrentThread(String additional) {
System.out.println(additional + "_" + Thread.currentThread());
}