RxJava -使用RxJava构建异步REST API



查看RxJava为我们的api构建异步支持。现在我们使用jetty + JAX-RS @Path注解,我不确定将传入的REST api调用绑定到RxJava api的正确方法是什么?

基本上这是在释放请求线程直到来自DB的响应已经准备好。

看着Vert。X,但这需要Java 7,而我们现在只能使用Java 6。

寻找关于上述的建议。典型的方法是什么需要将传入的http请求绑定到RxJava api。

下面的示例将为JAX-RS:

创建一个Customer Observable
public class ApiService {
    Client client;
    public ApiService() {
        client = ClientBuilder.newClient();
    }
    public Observable<Customer> createCustomerObservable(final int customerId) {
        return Observable.create(new Observable.OnSubscribe<Customer>() {
            @Override
            public void call(final Subscriber<? super Customer> subscriber) {
                client
                        .target("http://domain.com/customers/{id}")
                        .resolveTemplate("id", customerId)
                        .request()
                        .async()
                        .get(new InvocationCallback<Customer>() {
                            @Override
                            public void completed(Customer customer) {
                                // Do something
                                if (!subscriber.isUnsubscribed()) {
                                    subscriber.onNext(customer);
                                    subscriber.onCompleted();
                                }
                            }
                            @Override
                            public void failed(Throwable throwable) {
                                // Process error
                                if (!subscriber.isUnsubscribed()) {
                                    subscriber.onError(throwable);
                                }
                            }
                        });
            }
        });
    }
}

Jetty:

public class ApiService {
    HttpClient httpClient;
    public ApiService(HttpClient httpClient,) {
        this.httpClient = httpClient;
    }
    public <RequestType, ResultType> Observable<ResultType> createApiObservable(final RequestType requestContent) {
        return Observable.create(new Observable.OnSubscribe<ResultType>() {
            @Override
            public void call(final Subscriber<? super ResultType> subscriber) {
                // Create the request content for your API. Your logic here...
                ContentProvider contentProvider = serializeRequest(requestContent);
                httpClient
                        .newRequest("http://domain.com/path")
                        .content(contentProvider)
                        .send(new Response.CompleteListener() {
                            @Override
                            void onComplete(Result result) {
                                // Pass along the error if one occurred.
                                if (result.isFailed()) {
                                    subscriber.onError(result.getFailure());
                                    return;
                                }
                                // Convert the response data to the ResultType. Your logic here...
                                ResultType resultContent = parseResponse(result.getResponse());
                                // Send the result to the subscriber.
                                subscriber.onNext(responseBytes);
                                subscriber.onCompleted();
                            }
                        });
            }
        });
    }
}

相关内容

  • 没有找到相关文章

最新更新