如何防止传入连接的 Akka TCP 流在配置的最大连接数连接后连接?

  • 本文关键字:连接 配置 TCP 何防止 Akka java akka-stream
  • 更新时间 :
  • 英文 :


[EDIT]:问题已经解决,Artur提供的解决方案被添加为最后的编辑。

我试图实现的想法是,TCP 服务器允许 n 个连接,如果我得到 n+1 个连接,则不允许连接。

因此,我需要以某种方式取消连接,然后继续将该特定流连接到 Sink.cancel()。

我有一个连接到自定义流的传入连接,该流根据连接计数对传入连接进行分区。一旦超过最大连接计数,分区逻辑就会将其定向到连接到 Sink.cancel 的插座。

期望是立即取消连接,但它允许客户端连接,然后在一段时间后断开连接。

也许我遇到了与答案中提到的相同的问题 为什么 Akka TCP 流服务器在没有 connection.handlewith. 的流时断开客户端连接? 找不到要处理的流,并且它徘徊并断开连接。

我正在寻找

  1. 一个干净的解决方案,当超过最大值时不允许传入连接。
  2. Sink.cancel() 在做什么(如果有的话,它正在做某事)。
package com.example;
import java.util.concurrent.CompletionStage;
import akka.Done;
import akka.NotUsed;
import akka.actor.typed.ActorSystem;
import akka.actor.typed.javadsl.Behaviors;
import akka.stream.FlowShape;
import akka.stream.SinkShape;
import akka.stream.UniformFanOutShape;
import akka.stream.javadsl.Flow;
import akka.stream.javadsl.GraphDSL;
import akka.stream.javadsl.Partition;
import akka.stream.javadsl.Sink;
import akka.stream.javadsl.Source;
import akka.stream.javadsl.Tcp;
import akka.stream.javadsl.Tcp.IncomingConnection;
import akka.stream.javadsl.Tcp.ServerBinding;
import akka.util.ByteString;

public class SimpleStream03 {
private static int connectionCount = 0;
private static int maxConnectioCount = 2;
public static void runServer() {
ActorSystem actorSystem = ActorSystem.create(Behaviors.empty(), "actorSystem");
Source<IncomingConnection, CompletionStage<ServerBinding>> source = Tcp.get(actorSystem).bind("127.0.0.1",
8888); 
Sink<IncomingConnection, CompletionStage<Done>> handler = Sink.foreach(conn -> {

System.out.println("Handler Sink Connection Count " + connectionCount);
System.out.println("Handler Sink Client connected from: " + conn.remoteAddress());
conn.handleWith(Flow.of(ByteString.class), actorSystem);
});
Flow<IncomingConnection, IncomingConnection, NotUsed> connectioncountFlow = Flow
.fromGraph(GraphDSL.create(builder -> {
SinkShape<IncomingConnection> sinkCancelled = builder.add(Sink.cancelled());
FlowShape<IncomingConnection, IncomingConnection> inFlowShape = builder
.add(Flow.of(IncomingConnection.class).map(conn -> {
connectionCount++;
return conn;
}));
UniformFanOutShape<IncomingConnection, IncomingConnection> partition = builder
.add(Partition.create(IncomingConnection.class, 2, param -> {
if (connectionCount > maxConnectioCount) {
connectionCount = maxConnectioCount;
System.out.println("Outlet 0 -> Sink.cancelled");
return 0;
}
System.out.println("Outlet 1 -> forward to handler");
return 1;
}));
builder.from(inFlowShape).toFanOut(partition);
builder.from(partition.out(0)).to(sinkCancelled);
return new FlowShape<>(inFlowShape.in(), partition.out(1));
}));
CompletionStage<ServerBinding> bindingFuture = source.via(connectioncountFlow).to(handler).run(actorSystem);
bindingFuture.handle((binding, throwable) -> {
if (binding != null) {
System.out.println("Server started, listening on: " + binding.localAddress());
} else {
System.err.println("Server could not bind to  : " + throwable.getMessage());
actorSystem.terminate();
}
return NotUsed.getInstance();
});
}
public static void main(String[] args) throws InterruptedException {
SimpleStream03.runServer();
}
}

输出确认分区正常工作,并且 2 个连接正在访问主接收器处理程序。

Server started, listening on: /127.0.0.1:8888
Outlet 1 -> forward to handler
Handler Sink Connection Count 1
Handler Sink Client connected from: /127.0.0.1:60327
Outlet 1 -> forward to handler
Handler Sink Connection Count 2
Handler Sink Client connected from: /127.0.0.1:60330
Outlet 0 -> Sink.cancelled

编辑:实现接受的答案,以下更改可防止在超出阈值后传入连接。客户端看到对等方重置的连接

+---------------------------------------------------------+
|                                                         |
|                                         Fail Flow       |
|                                        +-------------+  |
|                                    +-->+Sink  |Source|  |
|                                    |   |cancel|fail  |  |
|                                    |   +-------------+  |
|                      +----------+  |                    |
|                      |          |  |                    |
|  +----------+        |        O0+--+                    |
connections|FLOW      |        |          |  O0:count > threshold |
+-------+-->          +------->+ Partition|                       |
|  |count++   |        |        O1+----------------------------->
|  +----------+        |          |                       |
|                      |          |  O1:count <= threshold|
|                      +----------+                       |
|                                                         |
+---------------------------------------------------------+

取代

SinkShape<IncomingConnection> sinkCancelled = builder.add(Sink.cancelled());

Sink<IncomingConnection, CompletionStage<Done>> connectionCancellingSink = Sink.foreach(ic -> ic
.handleWith(Flow.fromSinkAndSource(Sink.cancelled(), Source.failed(new Throwable("killed"))),
actorSystem));// Sink.ignore and Sink.cancel give me the same expected result
SinkShape<IncomingConnection> sinkCancelledShape = builder.add(connectionCancellingSink);

Sink.cancelled()立即取消上游 (https://doc.akka.io/docs/akka/current/stream/operators/Sink/cancelled.html)。

但是,您的Partition是在 eagerCancel 设置为false的情况下创建的


/**
* Create a new `Partition` operator with the specified input type, `eagerCancel` is `false`.
*
* @param clazz a type hint for this method
* @param outputCount number of output ports
* @param partitioner function deciding which output each element will be targeted
*/
def create[T](
@unused clazz: Class[T],
outputCount: Int,
partitioner: function.Function[T, Integer]): Graph[UniformFanOutShape[T, T], NotUsed] =
new scaladsl.Partition(outputCount, partitioner.apply, eagerCancel = false)

这意味着Partition只有在其所有下游连接取消时才会取消。这不是你想要的。但是你也不想eagerCancel=true,因为这意味着第一个超过限制的连接将破坏整个Partition,从而破坏你的所有连接。基本上是破坏整个服务器。

也许从嵌套流的角度考虑这里的情况很有用。顶级Source<IncomingConnection>表示接受的 TCP 连接流。您不想取消该流。如果你这样做,你只是杀死了你的服务器。每个IncomingConnection代表一个单独的 TCP 连接。在这种连接上发生的字节交换也表示为流。对于超过阈值的每个连接,您要取消的正是此流。

为此,您可以定义一个连接取消Sink如下所示:

Sink<IncomingConnection, CompletionStage<Done>>
connectionCancellingSink =
Sink.foreach(
ic ->
ic.handleWith(
Flow.fromSinkAndSource(Sink.cancelled(), Source.empty()),
actorSystem));

IncomingConnection允许您使用handleWith方法附加处理程序。为此,您需要一个Flow,因为您既消耗来自客户端的字节,也可能将字节发送到客户端(传入的字节进入Flow以及您想要发送回客户端的任何内容,您需要在Flow的输出上生成)。在我们的例子中,我们只想立即取消该流。您可以使用Flow.fromSinkAndSource来获得Flow...SinkSource.您可以利用它来插件Sink.cancelledSource.empty。因此,Source.empty意味着我们不会向连接发送任何字节,Sink.cancelled将立即取消流,并希望取消底层TCP连接。让我们试一试。

最后要做的是将我们新的取消Sink插入Partition

SinkShape<IncomingConnection> sinkCancelled =
builder.add(connectionCancellingSink);
//...the rest stays the same
builder.from(partition.out(0))
.to(sinkCancelled);

如果这样做,在第三个连接上,您将看到以下消息:

Not aborting connection from 127.0.0.1:49874 because downstream cancelled stream without failure

所以Sink.cancelled()并没有真正触发你想要的东西。让我们重新定义我们的取消Flow

Sink<IncomingConnection, CompletionStage<Done>>
connectionCancellingSink =
Sink.foreach(
ic ->
ic.handleWith(
Flow.fromSinkAndSource(Sink.ignore(),
Source.failed(new Throwable("killed"))),
actorSystem));

现在,这是使用Sink.ignore()来忽略传入的字节,但通过Source.failed(...)使流失败。这将导致连接立即终止,并且跟踪跟踪将打印在服务器输出上。如果你想保持安静,你可以在没有堆栈跟踪的情况下创建异常:

public static class TerminatedException extends Exception
{
public TerminatedException(String message)
{
super(message, null, false, false);
}
}

然后使用它使连接流失败

Flow.fromSinkAndSource(Sink.ignore(),
Source.failed(
new TerminatedException(("killed"))))

这样你就会得到更干净的日志。

最新更新