多线程+ RxJava观察条件



我有一个长时间运行的任务,结果生成常规文件和一个列出其他文件的主文件。

调度程序每天通过cron重新生成一次这些文件。

任务流使用rx-java实现。

问题是,如果一个请求进入并启动任务,或者任务由调度程序运行,然后当任务正在进行时,一些其他请求来了,而不是等待任务完成,而是触发另一个执行。

那么问题是如何在任务执行时同步,使它只执行一次?

下面是示例代码:
@Service
public class FileService {
    @Autowired FileRepository fileRepository;
    @Autowired List<Pipeline> pipelines;
    public Observable<File> getMainFile() {
        if (fileRepository.isMainFileExists())
            return Observable.just(fileRepository.getMainFile());
        else
            return generate(() -> fileRepository.getMainFile());
    }
    public Observable<File> getFile(String fileName) {
        if (fileRepository.isMainFileExists())
            return Observable.just(fileRepository.getFile(fileName));
        else
            return generate(() -> fileRepository.getFile(fileName));
    }
    Observable<File> generate(Func0<File> whenGenerated) {
        return Observable.from(pipelines)
                // other business logic goes here
                // after task execution finished just get needed file
                .map(isAllPipelinesSuccessful -> {
                    return whenGenerated.call();
                });
    }
    @Scheduled(cron = "0 0 4 * * ?")
    void scheduleGeneration() {
        generate(() -> fileRepository.getMainFile()).subscribe();
    }
}

从控制器调用,示例代码如下:

@RestController
public class FileController {
    private static final Long TIMEOUT = 1_000 * 60 * 10L; //ten mins
    @Autowired FileService fileService;
    @RequestMapping(value = "/mainfile", produces = "application/xml")
    public DeferredResult<ResponseEntity<InputStreamResource>> getMainFile() {
        DeferredResult<ResponseEntity<InputStreamResource>> deferredResult = new DeferredResult<>(TIMEOUT);
        Observable<File> observableMainFile = fileService.getMainFile();
        observableMainFile
                .map(this::fileToInputStreamResource)
                .map(resource -> ResponseEntity.ok().cacheControl(CacheControl.maxAge(1, TimeUnit.HOURS).cachePublic()).body(resource))
                .subscribe(deferredResult::setResult, ex -> {
                deferredResult.setErrorResult(ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(null));
                });
        return deferredResult;
    }
    @RequestMapping(value = "/files/{filename:.+}", produces = "application/xml")
    public DeferredResult<ResponseEntity<InputStreamResource>> getFile(@PathVariable("filename") String filename) {
        DeferredResult<ResponseEntity<InputStreamResource>> deferredResult = new DeferredResult<>(TIMEOUT);
        Observable<File> observableFile = fileService.getFile(filename);
        observableFile
                .map(this::fileToInputStreamResource)
                .map(resource -> ResponseEntity.ok().cacheControl(CacheControl.maxAge(1, TimeUnit.HOURS).cachePublic()).body(resource))
                .subscribe(deferredResult::setResult, ex -> {
                    boolean isFileNotFound = FileNotFoundException.class.isInstance(ex.getCause());
                    HttpStatus status = isFileNotFound ? HttpStatus.NOT_FOUND : HttpStatus.INTERNAL_SERVER_ERROR;
                    deferredResult.setErrorResult(ResponseEntity.status(status).body(null));
                });
        return deferredResult;
    }
}

我有类似以下的东西,但我认为有更好的解决方案。我使用的是RxJava2-RC5。

  1. 答案缺少检查,该任务已执行。https://gist.github.com/anonymous/7b4717cea7ddce270a2e39850a3bd2a4

更新::

interface FileRepository {
        String getFile();
        Boolean isMainFileExists();
}
private static Scheduler executorService = Schedulers.from(Executors.newFixedThreadPool(1));
@org.junit.Test
public void schedulerTest123() throws Exception {
        FileRepository fRepo = mock(FileRepository.class);
        when(fRepo.getFile()).thenReturn("");
        when(fRepo.isMainFileExists()).thenReturn(false);
        Thread t1 = new Thread(() -> {
            getFile(fRepo, executorService).subscribe();
        });
        Thread t2 = new Thread(() -> {
            getFile(fRepo, executorService).subscribe();
        });
        t1.start();
        t2.start();
        Thread.sleep(3_000);
        when(fRepo.getFile()).thenReturn("DasFile");
        when(fRepo.isMainFileExists()).thenReturn(true);
        Thread t3 = new Thread(() -> {
            getFile(fRepo, executorService).subscribe();
        });
        t3.start();
        Thread.sleep(5_000);
}
private Observable<String> getFile(FileRepository fileRepo, Scheduler scheduler) {
        return Observable.defer(() -> {
            try {
                if (fileRepo.isMainFileExists()) {
                    return Observable.fromCallable(fileRepo::getFile)
                            .subscribeOn(Schedulers.io())
                            .doOnNext(s -> printCurrentThread("Get File from Repo"));
                } else {
                    return startLongProcess().doOnNext(s -> printCurrentThread("Push long processValue"));
                }
            } catch (Exception ex) {
                return Observable.error(ex);
            }
        }).subscribeOn(scheduler).doOnSubscribe(disposable -> printCurrentThread("SUB"));
    }
private Observable<String> startLongProcess() {
        return Observable.fromCallable(() -> {
            printCurrentThread("Doing LongProcess");
            Thread.sleep(5_000);
            return "leFile";
        });
}
private void printCurrentThread(String additional) {
        System.out.println(additional + "_" + Thread.currentThread());
}

最新更新