用带有返回值的同步方法包装一系列异步调用



我当前的代码使用了一系列异步进程,最终得到结果。我需要以这样一种方式包装它们,即每个都可以由同步方法访问,并将结果作为返回值。我希望使用执行器服务来实现这一点,以便允许其中许多操作同时发生。我感觉Future可能与我的实现相关,但我想不出一个好方法来实现它。

我现在的情况:

public class DoAJob {
  ResultObject result;
  public void stepOne() {
    // Passes self in for a callback
    otherComponent.doStepOne(this);
  }
  // Called back by otherComponent once it has completed doStepOne
  public void stepTwo(IntermediateData d) {
    otherComponent.doStepTwo(this, d);
  }
  // Called back by otherComponent once it has completed doStepTwo
  public void stepThree(ResultObject resultFromOtherComponent) {
    result = resultFromOtherComponent;
  //Done with process
  }
}

这在内部工作得很好,但现在我需要将我的过程映射到一个带有返回值的同步方法中,如:

public ResultObject getResult(){
  // ??? What goes here ???
}

有没有人有一个好主意,关于如何实现这一优雅?

如果您想将异步操作(在完成时执行回调)转换为同步/阻塞操作,您可以使用阻塞队列。如果你愿意,你可以把它封装在Future对象中。

  1. 定义一个只能容纳一个元素的阻塞队列:

    BlockingQueue<Result> blockingQueue = new ArrayBlockingQueue<Result>(1);

  2. 启动异步进程(将在后台运行),并编写回调,这样当它完成时,它将其结果添加到阻塞队列中。

  3. 在前台/应用程序线程中,让它从队列中取出(),该队列阻塞直到元素可用:

    Result result = blockingQueue.take();

我以前写过类似的东西(前台线程需要阻塞来自远程机器的异步响应),使用类似Future的东西,您可以在这里找到示例代码。

我对Guava库做了类似的事情;这些链接可能会为您指明正确的方向:

是可能的链异步调用使用番石榴吗?

https://code.google.com/p/guava-libraries/wiki/ListenableFutureExplained

如果你喜欢动手,你可以这样做

ResultObject result;
public void stepOne() 
    otherComponent.doStepOne(this);
    synchronized(this)
        while(result==null) this.wait();
    return result;
public void stepThree(ResultObject resultFromOtherComponent) 
    result = resultFromOtherComponent;
    synchronized(this)
        this.notify();

或者你可以使用更高级别的并发工具,如BlockingQueue, Semaphore, CountdownLatch, Phaser等。

请注意,DoAJob不是线程安全的-如果两个线程同时调用stepOne,则会导致问题。

我建议使用invokeAll(..)。它将向执行器提交一组任务,并阻塞直到最后一个任务完成(成功/例外)。然后它返回一个完整的Future对象列表,因此您可以循环它们并将结果合并到单个ResultObject中。

如果您希望仅以同步方式运行单个任务,则可以使用以下命令:

executor.invokeAll(Collections.singleton(task)); 

——编辑——

现在我想我更了解你的需要了。我假设您需要一种提交独立任务序列的方法。请看看代码我张贴在这个答案。

Bumerang是我的异步http请求库,它是为Android构建的http请求使用Java -> https://github.com/hanilozmen/Bumerang。我需要在不触及库的情况下进行同步调用。这是我的完整代码。npgal的回答启发了我,谢谢!类似的方法可以应用于所有类型的异步库。

public class TestActivity extends Activity {
MyAPI api = (MyAPI) Bumerang.get().initAPI(MyAPI.class);
BlockingQueue<Object> blockingQueue = new ArrayBlockingQueue<Object>(1);
static int indexForTesting;
@Override
protected void onCreate(Bundle savedInstanceState) {
    super.onCreate(savedInstanceState);
    setContentView(R.layout.activity_test);
    Thread t = new Thread(new Runnable() {
        @Override
        public void run() {
            for(int i = 0; i < 10; i++) {
                getItems();
                try {
                    Object response = blockingQueue.take(); // waits for the response
                    Log.i("TAG", "index " + indexForTesting + " finished. Response " + response.toString());
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    });
    t.start();
}
void getItems() {
    Log.i("TAG", "index " + ++indexForTesting + " started");
    api.getItems(new ResponseListener<Response<List<ResponseModel>>>() {
        @Override
        public void onSuccess(Response<List<ResponseModel>> response) {
            List<ResponseModel> respModel = response.getResponse();
            try {
                blockingQueue.put(response);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        @Override
        public void onError(Response<List<ResponseModel>> response) {
            Log.i("onError", response.toString());
            try {
                blockingQueue.put(response);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    });
}

}

最新更新