我有一个简单的 REST 服务,它有一个睡眠方法,它只在指定的时间内(以毫秒为单位)休眠,然后返回无内容响应。我的 RESTTest 类尝试首先调用http://localhost:8080/myapp/rest/sleep/7500
(休眠 7.5 秒),但只等待 5 秒。5 秒后,它会取消收到的 Future(尝试取消挂起的请求)并调用 http://localhost:8080/myapp/rest/sleep/5000
(休眠 5 秒)并等待 5 秒。
public class RESTTest {
private final Client client = ClientBuilder.newClient();
private final ReentrantLock lock = new ReentrantLock();
private final Condition responseReceived = lock.newCondition();
public static void main(final String... arguments) {
new RESTTest().listen(10000);
}
public void listen(final long time) {
System.out.println("Listen for " + time + " ms.");
Future<Response> _response =
client.
target("http://localhost:8080/myapp/rest/sleep/" + time)).
request().
async().
get(
new InvocationCallback<Response>() {
public void completed(final Response response) {
System.out.println("COMPLETED");
lock.lock();
try {
responseReceived.signalAll();
} finally {
lock.unlock();
}
}
public void failed(final Throwable throwable) {
lock.lock();
try {
responseReceived.signalAll();
} finally {
lock.unlock();
}
}
});
lock.lock();
try {
System.out.println("Waiting for 5000 ms.");
if (!responseReceived.await(5000, TimeUnit.MILLISECONDS)) {
System.out.println("Timed out!");
_response.cancel(true);
listen(5000);
} else {
System.out.println("Response received.");
}
} catch (final InterruptedException exception) {
// Do nothing.
} finally {
lock.unlock();
}
}
}
现在我希望看到"已完成"字符串只打印一次,而"收到的响应"字符串也只打印一次。但是,"已完成"字符串会打印两次!
Listen for 7500 ms.
Waiting for 5000 ms.
Timed out!
Listen for 5000 ms.
Waiting for 5000 ms.
COMPLETED
Response received.
COMPLETED
我在这里错过了什么?
谢谢
你已经想通了,但这里有一个非常模块化的解决方案,你可以与简单的Guava ListenableFuture一起使用。当然,您不必像我在Futures.allAsList中所做的那样汇集响应,但是您可以在最后执行类似操作并删除CountDownLatch。
顺便说一句,我很确定您的问题是线程问题。您看到的是 COMPLETE,因为在下次调用侦听 (5000) 后正在调用回调。请记住,异步将被线程化,因此输出到控制台可能会延迟到下一次上下文切换。服务器可能在 7500 信号量解锁后立即响应。
private Client client;
@Before
public void setup() {
final ClientConfig clientConfig = new ClientConfig();
clientConfig.register(OrtbBidRequestBodyReader.class);
clientConfig.register(OrtbBidRequestBodyWriter.class);
clientConfig.connectorProvider(new CachingConnectorProvider(new HttpUrlConnectorProvider()));
clientConfig.property(ClientProperties.ASYNC_THREADPOOL_SIZE, 3);
client = ClientBuilder.newClient(clientConfig);
}
@Test
public void testAsync() throws InterruptedException, ExecutionException, JsonProcessingException {
final WebTarget target = client
.target("http://localhost:8081/dsp-receiver-0.0.1-SNAPSHOT/ortb/bid/123123?testbid=bid");
final AtomicInteger successcount = new AtomicInteger();
final AtomicInteger noBid = new AtomicInteger();
final AtomicInteger clientError = new AtomicInteger();
final InvocationCallback<Response> callback = new InvocationCallback<Response>() {
@Override
public void completed(final Response response) {
if (response.getStatus() == 200) {
successcount.incrementAndGet();
} else if (response.getStatus() == 204) {
noBid.incrementAndGet();
} else {
clientError.incrementAndGet();
}
}
@Override
public void failed(final Throwable e) {
clientError.incrementAndGet();
logger.info("Client Error", e);
}
};
final Entity<OrtbBidRequest> entity = Entity.entity(testBidRequest, MediaType.APPLICATION_JSON);
final List<ListenableFuture<Response>> allFutures = Lists.newArrayList();
final Stopwatch stopwatch = Stopwatch.createStarted();
for (int i = 0; i < 100000; i++) {
logger.info("Running request {}", i);
final Future<Response> future = target.request().accept(MediaType.APPLICATION_JSON).async().post(entity,
callback);
final ListenableFuture<Response> response = JdkFutureAdapters.listenInPoolThread(future);
allFutures.add(response);
// For each 100 of requests we will wait on them, otherwise we
// may run out of memory. This is really just to test the stamina
// of the dsp
if (i % 200 == 0) {
Futures.allAsList(allFutures).get();
allFutures.clear();
}
}
logger.info("success count {} nobid {} client error {} ", successcount, noBid, clientError);
logger.info("Total time {} ms ", stopwatch.stop());
}