在使用 Apache 'HttpClient' 的 PUT 操作期间更快地检测中断的连接



我使用Apache HttpClient 4与REST API通信,大多数时候我做冗长的PUT操作。由于这些可能发生在不稳定的互联网连接上,我需要检测连接是否中断,并且可能需要重试(使用恢复请求)。

为了在现实世界中尝试我的例程,我开始了一个PUT操作,然后我翻转了笔记本电脑的Wi-Fi开关,导致任何数据流立即完全中断。然而,它需要很长时间(可能5分钟左右),直到最终抛出SocketException。

如何加快处理速度?我想设置一个30秒左右的超时时间

更新:

澄清一下,我的请求是PUT操作。因此,在很长一段时间内(可能是几个小时),唯一的操作是write()操作,没有读操作。有一个read()操作的超时设置,但是我找不到一个写操作的超时设置。

我正在使用我自己的实体实现,因此我直接写入OutputStream,一旦互联网连接中断,它将立即阻塞。如果OutputStreams有一个超时参数,所以我可以写out.write(nextChunk, 30000);,我可以自己检测这样的问题。实际上我试过了:

public class TimeoutHttpEntity extends HttpEntityWrapper {
  public TimeoutHttpEntity(HttpEntity wrappedEntity) {
    super(wrappedEntity);
  }
  @Override
  public void writeTo(OutputStream outstream) throws IOException {
    try(TimeoutOutputStreamWrapper wrapper = new TimeoutOutputStreamWrapper(outstream, 30000)) {
      super.writeTo(wrapper);
    }
  }
}

public class TimeoutOutputStreamWrapper extends OutputStream {
  private final OutputStream delegate;
  private final long timeout;
  private final ExecutorService executorService = Executors.newSingleThreadExecutor();
  public TimeoutOutputStreamWrapper(OutputStream delegate, long timeout) {
    this.delegate = delegate;
    this.timeout = timeout;
  }
  @Override
  public void write(int b) throws IOException {
    executeWithTimeout(() -> {
      delegate.write(b);
      return null;
    });
  }
  @Override
  public void write(byte[] b) throws IOException {
    executeWithTimeout(() -> {
      delegate.write(b);
      return null;
    });
  }
  @Override
  public void write(byte[] b, int off, int len) throws IOException {
    executeWithTimeout(() -> {
      delegate.write(b, off, len);
      return null;
    });
  }
  @Override
  public void close() throws IOException {
    try {
      executeWithTimeout(() -> {
        delegate.close();
        return null;
      });
    } finally {
      executorService.shutdown();
    }
  }
  private void executeWithTimeout(final Callable<?> task) throws IOException {
    try {
      executorService.submit(task).get(timeout, TimeUnit.MILLISECONDS);
    } catch (TimeoutException e) {
      throw new IOException(e);
    } catch (ExecutionException e) {
      final Throwable cause = e.getCause();
      if (cause instanceof IOException) {
        throw (IOException)cause;
      }
      throw new Error(cause);
    } catch (InterruptedException e) {
      throw new Error(e);
    }
  }
}
public class TimeoutOutputStreamWrapperTest {
  private static final byte[] DEMO_ARRAY = new byte[]{1,2,3};
  private TimeoutOutputStreamWrapper streamWrapper;
  private OutputStream delegateOutput;
  public void setUp(long timeout) {
    delegateOutput = mock(OutputStream.class);
    streamWrapper = new TimeoutOutputStreamWrapper(delegateOutput, timeout);
  }
  @AfterMethod
  public void teardown() throws Exception {
    streamWrapper.close();
  }
  @Test
  public void write_writesByte() throws Exception {
    // Setup
    setUp(Long.MAX_VALUE);
    // Execution
    streamWrapper.write(DEMO_ARRAY);
    // Evaluation
    verify(delegateOutput).write(DEMO_ARRAY);
  }
  @Test(expectedExceptions = DemoIOException.class)
  public void write_passesThruException() throws Exception {
    // Setup
    setUp(Long.MAX_VALUE);
    doThrow(DemoIOException.class).when(delegateOutput).write(DEMO_ARRAY);
    // Execution
    streamWrapper.write(DEMO_ARRAY);
    // Evaluation performed by expected exception
  }
  @Test(expectedExceptions = IOException.class)
  public void write_throwsIOException_onTimeout() throws Exception {
    // Setup
    final CountDownLatch executionDone = new CountDownLatch(1);
    setUp(100);
    doAnswer(new Answer<Void>() {
      @Override
      public Void answer(InvocationOnMock invocation) throws Throwable {
        executionDone.await();
        return null;
      }
    }).when(delegateOutput).write(DEMO_ARRAY);
    // Execution
    try {
      streamWrapper.write(DEMO_ARRAY);
    } finally {
      executionDone.countDown();
    }
    // Evaluation performed by expected exception
  }
  public static class DemoIOException extends IOException {
  }
}

这有点复杂,但它在我的单元测试中工作得很好。除了HttpRequestExecutor在第127行捕获异常并试图关闭连接之外,它在实际生活中也可以工作。但是,当尝试关闭连接时,它首先尝试刷新再次阻塞的连接。

我可能能够更深入地挖掘HttpClient,并找出如何防止这种刷新操作,但它已经是一个不太漂亮的解决方案,它只是要变得更糟。

:

看起来这不能在Java级别上完成。我能在另一层做吗?(我用的是Linux)

Java阻塞I/O不支持写操作的套接字超时。您完全可以由OS/JRE来解除被写操作阻塞的线程的阻塞。而且,这种行为往往是特定于OS/JRE的。

考虑使用基于非阻塞I/O (NIO)的HTTP客户端(如Apache HttpAsyncClient)可能是一个合理的情况。

您可以使用RequestConfig:

配置套接字超时
RequestConfig myRequestConfig = RequestConfig.custom()
    .setSocketTimeout(5000)  // 5 seconds
    .build();

当你调用时,只需分配你的新配置。例如,

HttpPut httpPut = new HttpPut("...");
httpPut.setConfig(requestConfig);
...
HttpClientContext context = HttpClientContext.create();
....
httpclient.execute(httpPut, context);

关于超时配置的更多信息,这里有一个很好的解释

她是我遇到的谈论connection eviction policy的链接之一:在这里

public static class IdleConnectionMonitorThread extends Thread {
private final HttpClientConnectionManager connMgr;
private volatile boolean shutdown;
public IdleConnectionMonitorThread(HttpClientConnectionManager connMgr) {
    super();
    this.connMgr = connMgr;
}
@Override
public void run() {
    try {
        while (!shutdown) {
            synchronized (this) {
                wait(5000);
                // Close expired connections
                connMgr.closeExpiredConnections();
                // Optionally, close connections
                // that have been idle longer than 30 sec
                connMgr.closeIdleConnections(30, TimeUnit.SECONDS);
            }
        }
    } catch (InterruptedException ex) {
        // terminate
    }
}
public void shutdown() {
    shutdown = true;
    synchronized (this) {
        notifyAll();
    }
}}

我想你可能会想看看这个

相关内容

  • 没有找到相关文章

最新更新