我有一个名为处理器的接口:
public interface Processor {
MyResponse process(Request request, String id);
}
我有这个处理器的两个实现,一个是同步的,另一个是异步的(都是Spring Service类(:
这是同步版本,完成作业并给出完整的响应:
@Service
public class SynchrnousProcess implements Processor {
@Override
public MyResponse process(Request request, String id) {
Job job = getJob(request);
JobParameters parameter = buildParams(request, id);
JobExecution jobExecution = kickOfJob(job, request, parameter);
return buildResponse(request, trackingId, jobExecution);
}
}
这是异步版本,它将请求添加到阻塞队列:
@Service
public class AsyncProcess implements Processor {
private BlockingQueue<Pair> requestQueue = new ArrayBlockingQueue<>(100);
@Override
public MyResponse process(Request request, String id) {
//add request to requestQueue
}
public void completeProcess() {
//take data from queue and process
log.info("start process !!!!!!!!!!!!!!!");
if (!CollectionUtils.isEmpty(requestQueue)) {
requestQueue.stream().forEach(pair -> {
String trackingId = String.valueOf(pair.getFirst());
FeedDto feedDto = (FeedDto) pair.getSecond();
Request request = feedDto.getRequest();
Job job = getJob(request);
JobParameters parameter = getJobParameters(request, trackingId);
JobExecution jobExecution = runJob(job, request, parameter);
}
}
正如您所看到的,completeProcess()
从队列中取出数据并处理作业。用于获取作业、构建参数的通用代码在同步运行和异步之间重复。
另一个线程在后台运行completeProcess()
。
我想实现一个干净的设计,并将两个接口实现共享的公共代码移到一个地方。我如何才能实现我正在努力实现的目标?看看在这种情况下使用的设计模式的例子真的会很有帮助吗?
此外,任何关于处理排队请求的ExecutorService应该在哪里启动以及处理这些请求的线程应该如何启动的建议都会有很大帮助。
您可以设计接口,因为它必须执行异步操作。例如:
public interface Callback() {
void onResponse(MyResponse mr);
}
public void Processor {
void process(Request request, String id, Callback callback);
}
@Service
public class SynchrnousProcess implements Processor {
@Override
public void process(Request request, String id, Callback callback) {
Job job = getJob(request);
JobParameters parameter = buildParams(request, id);
JobExecution jobExecution = kickOfJob(job, request, parameter);
MyResponse r = buildResponse(request, trackingId, jobExecution);
callback.onResponse(r);
}
}
@Service
public class AsyncProcess implements Processor {
private BlockingQueue<Pair> requestQueue = new ArrayBlockingQueue<>(100);
@Override
public void process(Request request, String id, Callback callback) {
//add request and callback to requestQueue
}
}
我不知道谁将处理队列,但谁将使用Processor肯定不必担心调用是否同步。
这是与您在C#中使用async await的方法最相似的方法。在C#中,您必须返回一个Task<MyResponse>即使在同步的情况下也是如此。然后,使用者将始终认为调用将是异步的。