我当前的代码使用了一系列异步进程,最终得到结果。我需要以这样一种方式包装它们,即每个都可以由同步方法访问,并将结果作为返回值。我希望使用执行器服务来实现这一点,以便允许其中许多操作同时发生。我感觉Future可能与我的实现相关,但我想不出一个好方法来实现它。
我现在的情况:
public class DoAJob {
ResultObject result;
public void stepOne() {
// Passes self in for a callback
otherComponent.doStepOne(this);
}
// Called back by otherComponent once it has completed doStepOne
public void stepTwo(IntermediateData d) {
otherComponent.doStepTwo(this, d);
}
// Called back by otherComponent once it has completed doStepTwo
public void stepThree(ResultObject resultFromOtherComponent) {
result = resultFromOtherComponent;
//Done with process
}
}
这在内部工作得很好,但现在我需要将我的过程映射到一个带有返回值的同步方法中,如:
public ResultObject getResult(){
// ??? What goes here ???
}
有没有人有一个好主意,关于如何实现这一优雅?
如果您想将异步操作(在完成时执行回调)转换为同步/阻塞操作,您可以使用阻塞队列。如果你愿意,你可以把它封装在Future对象中。
-
定义一个只能容纳一个元素的阻塞队列:
BlockingQueue<Result> blockingQueue = new ArrayBlockingQueue<Result>(1);
-
启动异步进程(将在后台运行),并编写回调,这样当它完成时,它将其结果添加到阻塞队列中。
-
在前台/应用程序线程中,让它从队列中取出(),该队列阻塞直到元素可用:
Result result = blockingQueue.take();
我以前写过类似的东西(前台线程需要阻塞来自远程机器的异步响应),使用类似Future的东西,您可以在这里找到示例代码。
我对Guava库做了类似的事情;这些链接可能会为您指明正确的方向:
是可能的链异步调用使用番石榴吗?
https://code.google.com/p/guava-libraries/wiki/ListenableFutureExplained如果你喜欢动手,你可以这样做
ResultObject result;
public void stepOne()
otherComponent.doStepOne(this);
synchronized(this)
while(result==null) this.wait();
return result;
public void stepThree(ResultObject resultFromOtherComponent)
result = resultFromOtherComponent;
synchronized(this)
this.notify();
或者你可以使用更高级别的并发工具,如BlockingQueue, Semaphore, CountdownLatch, Phaser等。
请注意,DoAJob
不是线程安全的-如果两个线程同时调用stepOne
,则会导致问题。
我建议使用invokeAll(..)。它将向执行器提交一组任务,并阻塞直到最后一个任务完成(成功/例外)。然后它返回一个完整的Future对象列表,因此您可以循环它们并将结果合并到单个ResultObject中。
如果您希望仅以同步方式运行单个任务,则可以使用以下命令:
executor.invokeAll(Collections.singleton(task));
——编辑——
现在我想我更了解你的需要了。我假设您需要一种提交独立任务序列的方法。请看看代码我张贴在这个答案。Bumerang是我的异步http请求库,它是为Android构建的http请求使用Java -> https://github.com/hanilozmen/Bumerang。我需要在不触及库的情况下进行同步调用。这是我的完整代码。npgal的回答启发了我,谢谢!类似的方法可以应用于所有类型的异步库。
public class TestActivity extends Activity {
MyAPI api = (MyAPI) Bumerang.get().initAPI(MyAPI.class);
BlockingQueue<Object> blockingQueue = new ArrayBlockingQueue<Object>(1);
static int indexForTesting;
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_test);
Thread t = new Thread(new Runnable() {
@Override
public void run() {
for(int i = 0; i < 10; i++) {
getItems();
try {
Object response = blockingQueue.take(); // waits for the response
Log.i("TAG", "index " + indexForTesting + " finished. Response " + response.toString());
} catch (Exception e) {
e.printStackTrace();
}
}
}
});
t.start();
}
void getItems() {
Log.i("TAG", "index " + ++indexForTesting + " started");
api.getItems(new ResponseListener<Response<List<ResponseModel>>>() {
@Override
public void onSuccess(Response<List<ResponseModel>> response) {
List<ResponseModel> respModel = response.getResponse();
try {
blockingQueue.put(response);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
@Override
public void onError(Response<List<ResponseModel>> response) {
Log.i("onError", response.toString());
try {
blockingQueue.put(response);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
}
}