通过Tomcat servlet代理常规HTTP和WebSocket



我正在实现一个web应用程序,除其他事项外,必须显示并与代理到后端服务的网页交互。为此,我使用HTTP-Proxy-Servlet,它在大多数情况下工作得很好。

但是,某些后端服务的网页使用websockets,而上面的代理servlet不支持websockets。

我试着通过对后端重建websocket调用,然后在流之间复制来实现它,但这不起作用。浏览器报告"无效的帧头"。Tomcat使用

失败
Error parsing HTTP request header
Invalid character found in method name. HTTP method names must be tokens
at org.apache.coyote.http11.Http11InputBuffer.parseRequestLine(Http11InputBuffer.java:414)

我代码:

import java.io.IOException;
import java.net.*;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.*;
import javax.servlet.ServletException;
import javax.servlet.http.*;
import org.apache.http.HttpRequest;
import org.mitre.dsmiley.httpproxy.ProxyServlet;
public class ProxyWithWebSocket extends ProxyServlet {
private static final long serialVersionUID = -2566573965489129976L;
protected ExecutorService exec;

@Override
public void init() throws ServletException {
super.init();
exec = Executors.newCachedThreadPool();
}

@Override
public void destroy() {
super.destroy();
exec.shutdown();
}
@Override
protected void service(HttpServletRequest servletRequest, HttpServletResponse servletResponse)
throws ServletException, IOException {
var wsKey = servletRequest.getHeader("Sec-WebSocket-Key");
if (wsKey != null) {
//initialize request attributes from caches if unset by a subclass by this point
if (servletRequest.getAttribute(ATTR_TARGET_URI) == null) {
servletRequest.setAttribute(ATTR_TARGET_URI, targetUri);
}
if (servletRequest.getAttribute(ATTR_TARGET_HOST) == null) {
servletRequest.setAttribute(ATTR_TARGET_HOST, targetHost);
}
String proxyRequestUri = rewriteUrlFromRequest(servletRequest);
URL u = new URL(proxyRequestUri);
var servletIn = servletRequest.getInputStream();
var servletOut = servletResponse.getOutputStream();
try (Socket sock = new Socket(u.getHost(), u.getPort())) {
var sockIn = sock.getInputStream();
var sockOut = sock.getOutputStream();

StringBuilder req = new StringBuilder(512);
req.append("GET " + u.getFile()).append(" HTTP/1.1");
System.out.println("  > WS|" + req);
req.append("rn");
var en = servletRequest.getHeaderNames();
while (en.hasMoreElements()) {
var n = en.nextElement();
String header = servletRequest.getHeader(n);
System.out.println("  > WS| " + n + ": " + header);
req.append(n + ": " + header + "rn");
}
req.append("rn");

sockOut.write(req.toString().getBytes(StandardCharsets.UTF_8));
sockOut.flush();

StringBuilder responseBytes = new StringBuilder(512);
int b = 0;
while (b != -1) {
b = sockIn.read();
if (b != -1) {
responseBytes.append((char)b);
var len = responseBytes.length();
if (len >= 4
&& responseBytes.charAt(len - 4) == 'r'
&& responseBytes.charAt(len - 3) == 'n'
&& responseBytes.charAt(len - 2) == 'r'
&& responseBytes.charAt(len - 1) == 'n'
) {
break;
}
}
}

String[] rows = responseBytes.toString().split("rn"); 

String response = rows[0];
System.out.println("  < WS|" + response);

int idx1 = response.indexOf(' ');
int idx2 = response.indexOf(' ', idx1 + 1);

for (int i = 1; i < rows.length; i++) {
String line = rows[i];
int idx3 = line.indexOf(":");
var k = line.substring(0, idx3);
var headerField = line.substring(idx3 + 2);
System.out.println("  < WS| " + k + ": " + headerField);
servletResponse.setHeader(k, headerField);
}

servletResponse.setStatus(Integer.parseInt(response.substring(idx1 + 1, idx2)));
servletResponse.flushBuffer();

System.out.println("  < WS| Flush");

var f1 = exec.submit(() -> {
var c = 0;

var bs = 0;
while ((bs = servletIn.read()) != -1) {
sockOut.write(bs);
c++;
}
System.out.println("  > WS| Done: " + c);
return null;
});
var f2 = exec.submit(() -> {
var c = 0;

var bs = 0;
while ((bs = sockIn.read()) != -1) {
servletOut.write(bs);
servletOut.flush();
c++;
}
System.out.println("  < WS| Done: " + c);
return null;
});

try {
f1.get();
} catch (Exception ex) {
f2.cancel(true);
return;
}
try {
f2.get();
} catch (Exception ex) {

}
}
} else {
super.service(servletRequest, servletResponse);
}
}
}

典型的交换是这样的(通过那些println):

> WS|GET /cellhub?id=NhWO8SnGyDb_Vrk23rmhVQ HTTP/1.1
> WS| host: localhost:8080
> WS| connection: Upgrade
> WS| pragma: no-cache
> WS| cache-control: no-cache
> WS| user-agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/94.0.4606.71 Safari/537.36
> WS| upgrade: websocket
> WS| origin: http://localhost:8080
> WS| sec-websocket-version: 13
> WS| accept-encoding: gzip, deflate, br
> WS| accept-language: hu,hu-HU;q=0.9,en-US;q=0.8,en;q=0.7
> WS| cookie: JSESSIONID=57E4B30452BC3EB2657139DAF70E65AD; JSESSIONID=AD5E7BB5FE17B4072F3ABEE32B9479AC
> WS| sec-websocket-key: nrZWEb6Co4DKggUNwPeV8g==
> WS| sec-websocket-extensions: permessage-deflate; client_max_window_bits
< WS|HTTP/1.1 101 Switching Protocols
< WS| Connection:  Upgrade
< WS| Date:  Thu, 07 Oct 2021 13:18:41 GMT
< WS| Server:  Kestrel
< WS| Upgrade:  websocket
< WS| Sec-WebSocket-Accept:  /9uN8ZF67WepGJQ3+DPBLMCBotc=
< WS| Flush
> WS| Done: 0
< WS| Done: 42

我怎样才能使它工作?

编辑

我发现HttpServletRequest.upgrade方法似乎是为了改变协议。我已经更新了标题复制后的部分:

int respCode = Integer.parseInt(response.substring(idx1 + 1, idx2));
if (respCode != 101) {
servletResponse.setStatus(respCode);
servletResponse.flushBuffer();
System.out.println("  < WS| Flush");
closeSocket = true;
} else {
var uh = servletRequest.upgrade(WsUpgradeHandler.class);
uh.preInit(exec, sockIn, sockOut, sock);
}

其中WsUpgradeHandler

public static class WsUpgradeHandler implements HttpUpgradeHandler {
ExecutorService exec;
InputStream sockIn;
OutputStream sockOut;
Socket sock;
Future<?> f1;
Future<?> f2;

public WsUpgradeHandler() { }

public void preInit(ExecutorService exec, InputStream sockIn, OutputStream sockOut, Socket sock) {
this.exec = exec;
this.sockIn = sockIn;
this.sockOut = sockOut;
this.sock = sock;
}

@Override
public void init(WebConnection wc) {
System.out.println("  * WS| Upgrade begin");
try {
var servletIn = wc.getInputStream();
var servletOut = wc.getOutputStream();
f1 = exec.submit(() -> {
System.out.println("  > WS| Client -> Backend");
var c = 0;

var bs = 0;
try {
while ((bs = servletIn.read()) != -1) {
sockOut.write(bs);
c++;
}
} catch (Exception exc) {
exc.printStackTrace();
} finally {
sockOut.close();
}
System.out.println("  > WS| Done: " + c);
return null;
});
f2 = exec.submit(() -> {
System.out.println("  > WS| Backend -> Client");
var c = 0;

try {
var bs = 0;
while ((bs = sockIn.read()) != -1) {
servletOut.write(bs);
servletOut.flush();
c++;
}
} catch (Exception exc) {
exc.printStackTrace();
} finally {
servletOut.close();
}
System.out.println("  < WS| Done: " + c);
return null;
});
} catch (IOException ex) {
ex.printStackTrace();
}
}
@Override
public void destroy() {
System.out.println("  * WS| Upgrade closing");
f1.cancel(true);
f2.cancel(true);
try {
sock.close();
} catch (IOException ex) {

}
System.out.println("  * WS| Upgrade close");
}

}

这确实适用于传递消息,但是如果来自浏览器的websocket连接结束,Tomcat的CPU利用率会变得非常高(此时不应该发生其他活动)。看起来Tomcat的一些或所有NIO头都在旋转,我正在使用的线程池不再有线程了。

我想我已经解决了这个问题。

上面的代码几乎是正确的,只有一个例外:显然init()方法在使用阻塞模式时不应该返回,正如这个Tomcat测试示例所示。

第二个问题,即高CPU使用率,可以追溯到tomcat中的一个轮询线程,该线程之前有错误。我在Tomcat 9.0.12中运行我的代码,一旦升级到Tomcat 9.0.54, CPU使用问题就消失了。

因此完整的工作代码看起来像这样:(我知道,我知道,字节阵雨和手动准备HTML请求不是最佳的,但这就是Loom的作用,对吧;)

import java.io.*;
import java.net.*;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.*;
import javax.servlet.ServletException;
import javax.servlet.http.*;
import org.apache.http.HttpRequest;
import org.mitre.dsmiley.httpproxy.ProxyServlet;
public class ProxyWithWebSocket extends ProxyServlet {
private static final long serialVersionUID = -2566573965489129976L;
protected ExecutorService exec;

@Override
public void init() throws ServletException {
super.init();
exec = Executors.newCachedThreadPool();
}

@Override
public void destroy() {
super.destroy();
exec.shutdown();
}

@Override
protected void copyRequestHeaders(HttpServletRequest servletRequest, HttpRequest proxyRequest) {
super.copyRequestHeaders(servletRequest, proxyRequest);

String userId = (String)servletRequest.getAttribute("UserID");
if (userId != null) {
proxyRequest.addHeader("UserID", userId);
}
}
@Override
protected void service(HttpServletRequest servletRequest, HttpServletResponse servletResponse)
throws ServletException, IOException {
var wsKey = servletRequest.getHeader("Sec-WebSocket-Key");
if (wsKey != null) {

//initialize request attributes from caches if unset by a subclass by this point
if (servletRequest.getAttribute(ATTR_TARGET_URI) == null) {
servletRequest.setAttribute(ATTR_TARGET_URI, targetUri);
}
if (servletRequest.getAttribute(ATTR_TARGET_HOST) == null) {
servletRequest.setAttribute(ATTR_TARGET_HOST, targetHost);
}
String proxyRequestUri = rewriteUrlFromRequest(servletRequest);
URL u = new URL(proxyRequestUri);
Socket sock = new Socket(u.getHost(), u.getPort());
boolean closeSocket = false;
try {
var sockIn = sock.getInputStream();
var sockOut = sock.getOutputStream();

StringBuilder req = new StringBuilder(512);
req.append("GET " + u.getFile()).append(" HTTP/1.1");
System.out.println("  > WS|" + req);
req.append("rn");
var en = servletRequest.getHeaderNames();
while (en.hasMoreElements()) {
var n = en.nextElement();
String header = servletRequest.getHeader(n);
System.out.println("  > WS| " + n + ": " + header);
req.append(n + ": " + header + "rn");
}
req.append("rn");

sockOut.write(req.toString().getBytes(StandardCharsets.UTF_8));
sockOut.flush();

StringBuilder responseBytes = new StringBuilder(512);
int b = 0;
while (b != -1) {
b = sockIn.read();
if (b != -1) {
responseBytes.append((char)b);
var len = responseBytes.length();
if (len >= 4
&& responseBytes.charAt(len - 4) == 'r'
&& responseBytes.charAt(len - 3) == 'n'
&& responseBytes.charAt(len - 2) == 'r'
&& responseBytes.charAt(len - 1) == 'n'
) {
break;
}
}
}

String[] rows = responseBytes.toString().split("rn"); 

String response = rows[0];
System.out.println("  < WS|" + response);

int idx1 = response.indexOf(' ');
int idx2 = response.indexOf(' ', idx1 + 1);

for (int i = 1; i < rows.length; i++) {
String line = rows[i];
int idx3 = line.indexOf(":");
var k = line.substring(0, idx3);
var headerField = line.substring(idx3 + 2);
System.out.println("  < WS| " + k + ": " + headerField);
servletResponse.setHeader(k, headerField);
}

int respCode = Integer.parseInt(response.substring(idx1 + 1, idx2));
if (respCode != 101) {
servletResponse.setStatus(respCode);
servletResponse.flushBuffer();
System.out.println("  < WS| Flush");
closeSocket = true;
} else {
var uh = servletRequest.upgrade(WsUpgradeHandler.class);
uh.preInit(exec, sockIn, sockOut, sock);
}


} finally {
if (closeSocket) {
sock.close();
}
}
} else {
super.service(servletRequest, servletResponse);
}
}

public static class WsUpgradeHandler implements HttpUpgradeHandler {
ExecutorService exec;
InputStream sockIn;
OutputStream sockOut;
Socket sock;
Future<?> f2;

public WsUpgradeHandler() { }

public void preInit(ExecutorService exec, InputStream sockIn, OutputStream sockOut, Socket sock) {
this.exec = exec;
this.sockIn = sockIn;
this.sockOut = sockOut;
this.sock = sock;
}

@Override
public void init(WebConnection wc) {
System.out.println("  * WS| Upgrade begin");
try {
var servletIn = wc.getInputStream();
var servletOut = wc.getOutputStream();
f2 = exec.submit(() -> {
System.out.println("  > WS| Backend -> Client");
var c = 0;

try {
var bs = 0;
while ((bs = sockIn.read()) != -1) {
servletOut.write(bs);
servletOut.flush();
c++;
}
} catch (SocketException | EOFException exc) {
// this is fine
} catch (Exception exc) {
exc.printStackTrace();
} finally {
servletOut.close();
}
System.out.println("  < WS| Done: " + c);
return null;
});
System.out.println("  > WS| Client -> Backend");
var c = 0;

var bs = 0;
try {
while ((bs = servletIn.read()) != -1) {
sockOut.write(bs);
c++;
}
} catch (SocketException | EOFException exc) {
// this is fine
} catch (Exception exc) {
exc.printStackTrace();
} finally {
sockOut.close();
}
System.out.println("  > WS| Done: " + c);
f2.get();
} catch (Exception ex) {
ex.printStackTrace();
} finally {
if (f2 != null) {
f2.cancel(true);
}
}
}
@Override
public void destroy() {
System.out.println("  * WS| Upgrade closing");
if (f2 != null) {
f2.cancel(true);
}
try {
sock.close();
} catch (IOException ex) {

}
System.out.println("  * WS| Upgrade close");
}

}
}