如何在循环中调用AkkaHttpPOST(1000-10000次)



我正在用Java学习阿卡语。我和两个演员一起写了一个简单的节目。

我的第一个actorActorA是用包含1000个字符串的列表调用的ActorA循环遍历列表,并为每个元素调用strong>Actor B。

ActorB使用从ActorA接收的String参数对外部服务进行HttpPOST调用。

我预计ActorB将成功进行1000次HttpPOST调用,并将收到相同数量的响应。但是ActorB可以在80-120次之间随机发出POST请求,然后停止发出POST调用。

我尝试提供一个自定义调度程序,因为HTTPPOST调用是一个阻塞操作,但仍然没有成功!!

请参阅下面给出的代码和配置。

public class ActorA extends AbstractActor {
static public Props props() {
return Props.create(ActorA.class);
}

static public class IdWrapper {
List<String> ids;
public IdWrapper(List<String> ids) {
this.ids = ids;
}
}

@Override
public Receive createReceive() {
return receiveBuilder()
.match(IdWrapper.class, this::process)
.build();
}
private void process(IdWrapper msg) {
msg.ids.forEach(id -> {
context().actorSelection("actorB").tell(new MessageForB(id), ActorRef.noSender());
}
);
}

}

public class ActorB extends AbstractActor {   
final Http http = Http.get(getContext().system());
final Materializer materializer = ActorMaterializer.create(context());    
public static Props props() {
return Props.create(ActorB.class);
}
static public class MessageForB implements Serializable {
String id;
public MessageForB(String id) {
this.id = id;
}
}

@Override
public Receive createReceive() {
return receiveBuilder()
.match(MessageForB.class, this::process)
.build();
}
private void process(MessageForB messageForB) {
ExecutionContext ec = getContext().getSystem().dispatchers().lookup("my-blocking-dispatcher");
/**
* Get id from request
*/
String reqId = messageForB.id;
/**
* Prepare request
*/
XmlRequest requestEntity = getRequest(Stream.of(reqId).collect(Collectors.toList()));
String requestAsString = null;

try {
/**
* Create and configure JAXBMarshaller.
*/
JAXBContext jaxbContext = JAXBContext.newInstance(XmlRequest.class);
Marshaller jaxbMarshaller = jaxbContext.createMarshaller();
jaxbMarshaller.setProperty(Marshaller.JAXB_FRAGMENT, Boolean.TRUE);
/**
* Convert request entity to string before making POST request.
*/
StringWriter sw = new StringWriter();
jaxbMarshaller.marshal(requestEntity, sw);
requestAsString = sw.toString();
} catch (JAXBException e) {
e.printStackTrace();
}

/**
* Create RequestEntity from request string.
*/
RequestEntity entity = HttpEntities.create(
MediaTypes.APPLICATION_XML.toContentType(HttpCharsets.ISO_8859_1),
requestAsString);
/**
* Create Http POST with necessary headers and call
*/
final CompletionStage<HttpResponse> responseFuture =
http.singleRequest(HttpRequest.POST("http://{hostname}:{port}/path")
.withEntity(entity));
responseFuture
.thenCompose(httpResponse -> {
/**
* Convert response into String
**/
final CompletionStage<String> res = Unmarshaller.entityToString().unmarshal
(httpResponse.entity(), ec, materializer);
/**
* Consume response bytes
**/
httpResponse.entity().getDataBytes().runWith(Sink.ignore(), materializer);
return res;
})
.thenAccept(s -> {
try {
/**
* Deserialize string to DTO.
*/
MyResponse MyResponse = getMyResponse(s);
// further processing..
} catch (JAXBException e) {
e.printStackTrace();
}
});
}
private XmlRequest getRequest(List<String> identifiers){
XmlRequest request = new XmlRequest();
// Business logic to create req entity
return request;
}
private MyResponse getMyResponse(String s) throws JAXBException {
JAXBContext jaxbContext = JAXBContext.newInstance
(MyResponse.class);
javax.xml.bind.Unmarshaller jaxbUnmarshaller = jaxbContext
.createUnmarshaller();
StringReader reader = new StringReader(s);
return (MyResponse)
jaxbUnmarshaller.unmarshal(reader);
}

}

my-blocking-dispatcher {
type = Dispatcher
executor = "thread-pool-executor"
thread-pool-executor {
core-pool-size-min = 5
core-pool-size-max = 20
}
throughput = 1
}

我在哪里可以改进或更正我的代码,以便ActorB能够成功地对ActorA发送的所有项目进行HttpPOST调用?

正如我所看到的,您已经使用了http.singleReques

根据akka http文档

对于这些情况,AkkaHTTP提供了HTTP().singleRequest(…)方法,它只需将HttpRequest实例转换为Future[HttpResponse]。在内部,请求通过请求的有效URI的(缓存的)主机连接池进行调度。

http.singleRequest使用连接池来处理请求,因此您需要从akka-http-config增加连接池中的连接数。

在主机连接池部分,默认值为:

host-connection-pool {
max-connections = 4
min-connections = 0
max-retries = 5
max-open-requests = 32
pipelining-limit = 1
idle-timeout = 30 s
}

解决方案2:

使用http.outgoingConnection

根据akkahttp文档,它将为每个请求创建一个特定的连接。因此,您可以在没有连接池的情况下并行处理1000个连接。

使用连接级API,您可以通过物化HTTP(.outgoingConnection(…)方法返回的流来打开与目标端点的新HTTP连接。这里有一个例子:

def run(req:String): Unit ={
val apiBaseUrl = "example.com" //without protocol
val path = "/api/update"
val body = HttpEntity(ContentTypes.`application/json`,req.getBytes)
val request = HttpRequest(HttpMethods.POST, path,entity = body)
val connectionFlow = Http().outgoingConnection(apiBaseUrl)
val result =   Source.single(request).via(connectionFlow).runWith(Sink.head)
result.onComplete{
case Success(value) =>
println(value)
case Failure(e)=>
e.printStackTrace()
}
}

最新更新