CXF - 从生成的客户端检索 SOAP 响应 XML



我目前正在使用 wsdl-to-java 生成异步客户端代码,该代码用于查询 SOAP Web 服务。下面是生成的异步方法的代码片段:

@WebMethod(operationName = "GetSession")
public Future<?> getSessionAsync(
@WebParam(partName = "parameters", name = "GetSessionRequest")
mynamespace.GetSessionRequest parameters,
@WebParam(partName = "ResponseHeader", mode = WebParam.Mode.OUT, name = "ResponseHeader", header = true)
javax.xml.ws.Holder<mydatacontract.ResponseHeader> responseHeader,
@WebParam(name = "asyncHandler", targetNamespace = "")
AsyncHandler<myservice.GetSessionResponse> asyncHandler
);

我在包装类中调用上面生成的代码:

getSession(GetSessionRequest request) {
Future<?> response = generatedClient.getSessionAsync(request, responseHeader, handler)
}
handler(Response<GetSessionResponse> response) {
// no access to SOAP XML at this point?
}

根据我的理解,生成的代码负责序列化/反序列化,我无法访问原始 SOAP 响应。有一些方法可以记录 SOAP XML 响应,如此处所述,但我需要在代码中访问它,因为响应需要转储到数据库中。

有没有办法在不接触生成的客户端代码的情况下在处理程序中访问它?

更新:

我能够阅读java.util.Map<String, Object>类型的ResponseContext。但这不会返回我正在寻找的原始 SOAP XML。

此外,使用入站Interceptor意味着我丢失了调用函数的上下文。这是将与数据库中的每个调用关联的 XML 响应存储所必需的。

更新 2:

Future 返回一个类型为Response的对象,该对象可在 Response.java 中找到。jdoc 声明如下:

The interface provides methods used to obtain the
payload and context of a message sent in response to an operation
invocation.

但是,我只能检索Context,而没有访问有效负载的属性。

我在这里找到了一个 SO 答案,它有安讯士的解决方案。cxf 中是否有可能具有类似的东西?

问题:SOAP 消息可以被截获,但位于与原始调用非常不同的位置 网站。很难将 SOAP 消息传递回原始调用站点,尤其是在 多线程或异步环境。

我能看到的唯一解决方案是明确只有一个 JAX-WS 代理,每个代理都有一个处理程序 请求。一个应用程序只有一个代理将是一个瓶颈,因此需要使用 多线程工具,允许并行和异步执行。

这是我的想法,在代码中。首先,我逐步浏览它,最后有一个转储所有 法典。

更新:我已将LinkedBlockingQueue替换为静态ThreadLocal<SoapApiWrapper>实例,以及具有newWorkStealingPool()的执行器。有关更改,请参阅编辑历史记录!

我已经将其设置为使用 http://www.dneonline.com/calculator.asmx。它可以编译和运行,但我还没有 花了很多时间确保它正常工作或最佳。我确定有问题(我的 CPU 风扇 正在努力工作,即使我没有运行代码)。请注意!

(有谁知道一个更好的公共 SOAP API,我可以在本地运行或泛滥 请求?如果您想进行测试,下面是一些公共 SOAP API: https://documenter.getpostman.com/view/8854915/Szf26WHn

循序渐进

  1. 实现SOAPHandler将捕获消息的类,称为SoapMessageHandler

    public class SoapMessageHandler implements SOAPHandler<SOAPMessageContext> {
    // capture messages in a list
    private final List<SOAPMessageContext> messages = new ArrayList<>();
    // get & clear messages
    public List<SOAPMessageContext> collectMessages() {
    var m = new ArrayList<>(messages);
    messages.clear();
    return m;
    }
    @Override
    public boolean handleMessage(SOAPMessageContext context) {
    messages.add(context); // collect message
    return true;
    }
    @Override
    public boolean handleFault(SOAPMessageContext context) {
    messages.add(context); // collect error
    return true;
    }
    }
    
  2. 定义一个SoapApiWrapper

    1. 创建一个SoapMessageHandler
    2. 创建一个 JAX-WS 代理,
    3. 并将处理程序添加到代理。
    class SoapApiWrapper {
    // 1. create a handler    
    private final SoapMessageHandler soapMessageHandler = new SoapMessageHandler();
    private final CalculatorSoap connection;
    public SoapApiWrapper() {
    // 2. create one connection
    var factoryBean = new JaxWsProxyFactoryBean();
    factoryBean.setAddress("http://www.dneonline.com/calculator.asmx");
    factoryBean.setServiceClass(CalculatorSoap.class);
    // 3. add the Handler
    factoryBean.setHandlers(Collections.singletonList(soapMessageHandler));
    connection = factoryBean.create(CalculatorSoap.class);
    }
    }
    
  3. 定义SoapApiManager具有

    1. 一个ExecutorService,它将管理 SOAP 请求和响应
    2. 一个ThreadLocal<SoapApiWrapper>,所以每个线程都有一个JAX-WS代理(想法 从 https://stackoverflow.com/a/16680215/4161471)
    public class SoapApiManager   {
    // 1. request executor
    private static final ExecutorService executorService = Executors.newWorkStealingPool(THREAD_LIMIT);
    private static final ThreadLocal<SoapApiWrapper> soapApiWrapper = ThreadLocal.withInitial(SoapApiWrapper::new);
    }
    
  4. SoapApiManager有一个方法,submitRequest(...).它将返回 SOAP API 响应 ** 和** SOAP 消息。

    public <ResponseT> CompletableFuture<SoapResponseHolder<ResponseT>> submitRequest(
    SoapRequestRunner<ResponseT> requestRunner
    ) {
    //...
    }
    

    该参数是一个SoapRequestRunner,一个接受JAX-WS代理并返回SOAP的lambda。 响应。

    @FunctionalInterface
    interface SoapRequestRunner<ResponseT> {
    ResponseT sendRequest(CalculatorSoap calculatorSoap);
    }
    

    调用时,submitRequest(...)执行以下操作:

    1. CompleteableFuture.supplyAsync(...)包裹SoapRequestRunner,并使用 我们的ExectutorService
    2. ThreadLocal中获取SoapApiWrapper
    3. 调用 SOAP API(通过将SoapRequestRunner应用于SoapApiWrapper的 JAX-WS) 代理)
    4. 等待 SOAP 结果,
    5. SoapApiWrapperSOAPHandler中提取SOAP消息,
    6. 最后,将 SOAP 结果和 SOAP 消息捆绑在 DTO 中,SoapResponseHolder
    public <ResponseT> CompletableFuture<SoapResponseHolder<ResponseT>> submitRequest(
    SoapRequestRunner<ResponseT> requestRunner
    ) { // 1. use CompletableFuture & executorService
    return CompletableFuture.supplyAsync(createRequestCall(requestRunner), executorService);
    }
    private <ResponseT> Supplier<SoapResponseHolder<ResponseT>> createRequestCall(
    SoapRequestRunner<ResponseT> requestRunner
    ) {
    return () -> {
    SoapApiWrapper api = null;
    try {
    api = soapApiWrapperQueue.get(); // 2. fetch an API Wrapper
    var response = requestRunner.sendRequest(api.connection); // 3&4. request & response
    var messages = api.soapMessageHandler.collectMessages(); // 5. extract raw SOAP messages
    return new SoapResponseHolder<>(response, messages); // 6. bundle into DTO
    } catch (InterruptedException e) {
    throw new RuntimeException(e);
    } finally {
    if (api != null) {
    soapApiWrapperQueue.offer(api);
    }
    }
    };
    } 
    

示例用法

public class Main {

public static void main(String[] args) {
SoapApiManager apiManager = new SoapApiManager();
apiManager
.submitRequest((soapApi) -> soapApi.add(5, 4))
.thenAccept(response -> {
// we can get the SOAP API response
var sum = response.getResponse();
// and also the intercepted messages!
var messages = response.getMessages();
var allXml = messages.stream().map(Main::getRawXml).collect(Collectors.joining("n---n"));
System.out.println("sum: " + sum + ",n" + allXml);
});
}
public static String getRawXml(SOAPMessageContext context) {
try {
ByteArrayOutputStream byteOS = new ByteArrayOutputStream();
context.getMessage().writeTo(byteOS);
return byteOS.toString(StandardCharsets.UTF_8);
} catch (SOAPException | IOException e) {
throw new RuntimeException(e);
}
}
}

输出

sum: 105,
<soap:Envelope xmlns:soap="http://schemas.xmlsoap.org/soap/envelope/">
<soap:Body>
<Add xmlns="http://tempuri.org/">
<intA>73</intA>
<intB>32</intB>
</Add>
</soap:Body>
</soap:Envelope>
---
<soap:Envelope xmlns:soap="http://schemas.xmlsoap.org/soap/envelope/" xmlns:xsd="http://www.w3.org/2001/XMLSchema" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
<soap:Header/>
<soap:Body>
<AddResponse xmlns="http://tempuri.org/">
<AddResult>105</AddResult>
</AddResponse>
</soap:Body>
</soap:Envelope>
<小时 />

所有代码

下面是一个工作示例,其中包含响应的验证。

它创建大量(REQUESTS_COUNT)请求并将它们全部提交给SoapApiManager

每个请求都会打印出线程的名称和 JAX-WS 代理的哈希代码(我想检查一下 它们被重用),以及基本输入/输出(例如-9 - 99 = -108)。

进行验证以确保每个SoapResponseHolder都具有正确的结果和原始 SOAP 消息,并且发送了正确数量的请求。

Main.java


import com.github.underscore.lodash.Xml;
import com.github.underscore.lodash.Xml.XmlStringBuilder.Step;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.xml.soap.SOAPException;
import javax.xml.ws.handler.soap.SOAPMessageContext;
public class Main implements AutoCloseable {
private final SoapApiManager apiManager = new SoapApiManager();
private static final int THREAD_COUNT = 4;
private static final int REQUESTS_COUNT = 500;
private final AtomicInteger i = new AtomicInteger();
public static void main(String[] args)
throws InterruptedException {
try (var m = new Main()) {
m.run();
}
}
private void run() throws InterruptedException {
var executor = Executors.newFixedThreadPool(THREAD_COUNT);
var tasks = Stream.generate(() -> Map.entry(randomInt(), randomInt()))
.limit(REQUESTS_COUNT)
.map(intA -> (Callable<Boolean>) () -> {
sendAndValidateRequest(intA.getKey(), intA.getValue());
i.incrementAndGet();
return true;
})
.collect(Collectors.toList());
executor.invokeAll(tasks);
var waiter = Executors.newSingleThreadScheduledExecutor();
waiter.scheduleWithFixedDelay(
() -> {
var size = i.get();
System.out.println(">waiting... (size " + size + ")");
if (size >= REQUESTS_COUNT) {
System.out.println(">finished waiting! " + size);
waiter.shutdownNow();
}
},
3, 3, TimeUnit.SECONDS
);
System.out.println("Finished sending tasks " + waiter.awaitTermination(10, TimeUnit.SECONDS));
waiter.shutdownNow();
Thread.sleep(TimeUnit.SECONDS.toMillis(5));
executor.shutdown();
System.out.println(
"executor.awaitTermination " + executor.awaitTermination(10, TimeUnit.SECONDS));
if (!executor.isTerminated()) {
System.out.println("executor.shutdownNow " + executor.shutdownNow());
}
if (i.get() != REQUESTS_COUNT) {
throw new RuntimeException(
"Test did not execute " + REQUESTS_COUNT + " times, actual: " + i.get()
);
}
}
private int randomInt() {
return ThreadLocalRandom.current().nextInt(-100, 100);
}
private void sendAndValidateRequest(int a, int b) {
apiManager
.submitRequest((soapApi) -> {
var response = soapApi.add(a, b);
System.out.printf(
"[%-12s / %-18s] %4d %s %3d = %4dn",
soapApi.hashCode(),
Thread.currentThread().getName(),
a,
(b >= 0 ? "+" : "-"),
Math.abs(b),
response
);
return response;
})
.thenAcceptAsync(response -> {
var sum = response.getResponse();
var messages = response.getMessages();
var allXml = messages.stream().map(Main::getRawXml)
.collect(Collectors.joining("n---n"));
if (sum != a + b) {
throw new RuntimeException(
"Bad sum, sent " + a + " + " + b + ", result: " + sum + ", xml: " + allXml
);
}
if (messages.size() != 2) {
throw new RuntimeException(
"Bad messages, expected 1 request and 1 response, but got " + messages.size()
+ ", xml: " + allXml
);
}
if (!allXml.contains("<AddResult>" + (a + b) + "</AddResult>")) {
throw new RuntimeException(
"Bad result, did not contain AddResult=" + (a + b) + ", actual: " + allXml
);
}
});
}
public static String getRawXml(SOAPMessageContext context) {
try (var byteOS = new ByteArrayOutputStream()) {
context.getMessage().writeTo(byteOS);
var rawSoap = byteOS.toString(StandardCharsets.UTF_8);
return Xml.formatXml(rawSoap, Step.TWO_SPACES);
} catch (SOAPException | IOException e) {
throw new RuntimeException(e);
}
}
@Override
public void close() {
apiManager.close();
}
}

SoapApiManager.java


import java.util.Collections;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.SynchronousQueue;
import java.util.function.Supplier;
import javax.xml.ws.handler.soap.SOAPMessageContext;
import org.apache.cxf.jaxws.JaxWsProxyFactoryBean;
import org.tempuri.CalculatorSoap;
public class SoapApiManager implements AutoCloseable {
private static final int THREAD_LIMIT = Math.min(Runtime.getRuntime().availableProcessors(), 5);
private static final ExecutorService executorService = Executors.newWorkStealingPool(THREAD_LIMIT);
private static final ThreadLocal<SoapApiWrapper> soapApiWrapper = ThreadLocal.withInitial(SoapApiWrapper::new);
@Override
public void close() {
executorService.shutdown();
}
private static class SoapApiWrapper {
private final CalculatorSoap connection;
private final SoapMessageHandler soapMessageHandler = new SoapMessageHandler();
public SoapApiWrapper() {
var factoryBean = new JaxWsProxyFactoryBean();
factoryBean.setAddress("http://www.dneonline.com/calculator.asmx");
factoryBean.setServiceClass(CalculatorSoap.class);
factoryBean.setHandlers(Collections.singletonList(soapMessageHandler));
connection = factoryBean.create(CalculatorSoap.class);
}
}
public <ResponseT> CompletableFuture<SoapResponseHolder<ResponseT>> submitRequest(
SoapRequestRunner<ResponseT> requestRunner
) {
return CompletableFuture.supplyAsync(createRequestCall(requestRunner), executorService);
}
private <ResponseT> Supplier<SoapResponseHolder<ResponseT>> createRequestCall(
SoapRequestRunner<ResponseT> requestRunner
) {
return () -> {
SoapApiWrapper api = soapApiWrapper.get();
var response = requestRunner.sendRequest(api.connection);
var messages = api.soapMessageHandler.collectMessages();
return new SoapResponseHolder<>(response, messages);
};
}
@FunctionalInterface
interface SoapRequestRunner<ResponseT> {
ResponseT sendRequest(CalculatorSoap calculatorSoap);
}
public static class SoapResponseHolder<ResponseT> {
private final List<SOAPMessageContext> messages;
private final ResponseT response;
SoapResponseHolder(
ResponseT response,
List<SOAPMessageContext> messages
) {
this.response = response;
this.messages = messages;
}
public ResponseT getResponse() {
return response;
}
public List<SOAPMessageContext> getMessages() {
return messages;
}
}
}

SoapMessageHandler.java

package org.example;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import javax.xml.namespace.QName;
import javax.xml.ws.handler.MessageContext;
import javax.xml.ws.handler.soap.SOAPHandler;
import javax.xml.ws.handler.soap.SOAPMessageContext;
public class SoapMessageHandler implements SOAPHandler<SOAPMessageContext> {
private final List<SOAPMessageContext> messages = new ArrayList<>();
public List<SOAPMessageContext> collectMessages() {
var m = new ArrayList<>(messages);
messages.clear();
return m;
}
@Override
public Set<QName> getHeaders() {
return Collections.emptySet();
}
@Override
public boolean handleMessage(SOAPMessageContext context) {
messages.add(context);
return true;
}
@Override
public boolean handleFault(SOAPMessageContext context) {
messages.add(context);
return true;
}
@Override
public void close(MessageContext context) {
}
}

build.gradle.kts

plugins {
java
id("com.github.bjornvester.wsdl2java") version "1.2"
}
group = "org.example"
version = "1.0-SNAPSHOT"
repositories {
mavenCentral()
}
dependencies {
implementation(enforcedPlatform("org.apache.cxf:cxf-bom:3.4.4"))
implementation("org.apache.cxf:cxf-core")
implementation("org.apache.cxf:cxf-rt-frontend-jaxws")
implementation("org.apache.cxf:cxf-rt-transports-http")
implementation("org.apache.cxf:cxf-rt-databinding-jaxb")
//  implementation("org.apache.cxf:cxf-rt-transports-http-jetty")
implementation("org.apache.cxf:cxf-rt-transports-http-hc")
implementation("com.sun.activation:javax.activation:1.2.0")
implementation("javax.annotation:javax.annotation-api:1.3.2")
implementation("com.sun.xml.messaging.saaj:saaj-impl:1.5.1")
implementation("com.github.javadev:underscore:1.68")
//<editor-fold desc="JAXB">
implementation("org.jvnet.jaxb2_commons:jaxb2-basics-runtime:1.11.1")
xjcPlugins("org.jvnet.jaxb2_commons:jaxb2-basics:1.11.1")
//</editor-fold>

//<editor-fold desc="Test">
testImplementation(enforcedPlatform("org.junit:junit-bom:5.7.2")) // JUnit 5 BOM
testImplementation("org.junit.jupiter:junit-jupiter")
testImplementation("org.junit.jupiter:junit-jupiter-api:5.6.0")
testRuntimeOnly("org.junit.jupiter:junit-jupiter-engine")
//</editor-fold>
}
java {
toolchain {
languageVersion.set(JavaLanguageVersion.of(11))
}
}
wsdl2java {
cxfVersion.set("3.4.4")
options.addAll("-xjc-Xequals", "-xjc-XhashCode")
}
tasks.test {
useJUnitPlatform()
}

看看这个代码片段(来源:https://github.com/apache/cxf/blob/master/core/src/main/java/org/apache/cxf/interceptor/LoggingInInterceptor.java)

import org.apache.cxf.message.Message;

protected void logging(Logger logger, Message message) {
if (message.containsKey(LoggingMessage.ID_KEY)) {
return;
}
String id = (String)message.getExchange().get(LoggingMessage.ID_KEY);
if (id == null) {
id = LoggingMessage.nextId();
message.getExchange().put(LoggingMessage.ID_KEY, id);
}
message.put(LoggingMessage.ID_KEY, id);
final LoggingMessage buffer
= new LoggingMessage("Inbound Messagen----------------------------", id);
if (!Boolean.TRUE.equals(message.get(Message.DECOUPLED_CHANNEL_MESSAGE))) {
// avoid logging the default responseCode 200 for the decoupled responses
Integer responseCode = (Integer)message.get(Message.RESPONSE_CODE);
if (responseCode != null) {
buffer.getResponseCode().append(responseCode);
}
}
String encoding = (String)message.get(Message.ENCODING);
if (encoding != null) {
buffer.getEncoding().append(encoding);
}
String httpMethod = (String)message.get(Message.HTTP_REQUEST_METHOD);
if (httpMethod != null) {
buffer.getHttpMethod().append(httpMethod);
}
String ct = (String)message.get(Message.CONTENT_TYPE);
if (ct != null) {
buffer.getContentType().append(ct);
}
Object headers = message.get(Message.PROTOCOL_HEADERS);
if (headers != null) {
buffer.getHeader().append(headers);
}
String uri = (String)message.get(Message.REQUEST_URL);
if (uri == null) {
String address = (String)message.get(Message.ENDPOINT_ADDRESS);
uri = (String)message.get(Message.REQUEST_URI);
if (uri != null && uri.startsWith("/")) {
if (address != null && !address.startsWith(uri)) {
if (address.endsWith("/") && address.length() > 1) {
address = address.substring(0, address.length() - 1);
}
uri = address + uri;
}
} else {
uri = address;
}
}
if (uri != null) {
buffer.getAddress().append(uri);
String query = (String)message.get(Message.QUERY_STRING);
if (query != null) {
buffer.getAddress().append('?').append(query);
}
}
if (!isShowBinaryContent() && isBinaryContent(ct)) {
buffer.getMessage().append(BINARY_CONTENT_MESSAGE).append('n');
log(logger, buffer.toString());
return;
}
if (!isShowMultipartContent() && isMultipartContent(ct)) {
buffer.getMessage().append(MULTIPART_CONTENT_MESSAGE).append('n');
log(logger, buffer.toString());
return;
}

重要线路

InputStream is = message.getContent(InputStream.class);

////

if (is != null) {
logInputStream(message, is, buffer, encoding, ct);
} else {
Reader reader = message.getContent(Reader.class);
if (reader != null) {
logReader(message, reader, buffer);
}
}
log(logger, formatLoggingMessage(buffer));
}

因此,您可以创建一个拦截器并使用org.apache.cxf.message.Message获取消息的内容

最新更新