[EDIT]:问题已经解决,Artur提供的解决方案被添加为最后的编辑。
我试图实现的想法是,TCP 服务器允许 n 个连接,如果我得到 n+1 个连接,则不允许连接。
因此,我需要以某种方式取消连接,然后继续将该特定流连接到 Sink.cancel()。
我有一个连接到自定义流的传入连接,该流根据连接计数对传入连接进行分区。一旦超过最大连接计数,分区逻辑就会将其定向到连接到 Sink.cancel 的插座。
期望是立即取消连接,但它允许客户端连接,然后在一段时间后断开连接。
也许我遇到了与答案中提到的相同的问题 为什么 Akka TCP 流服务器在没有 connection.handlewith. 的流时断开客户端连接? 找不到要处理的流,并且它徘徊并断开连接。
我正在寻找
- 一个干净的解决方案,当超过最大值时不允许传入连接。
- Sink.cancel() 在做什么(如果有的话,它正在做某事)。
package com.example;
import java.util.concurrent.CompletionStage;
import akka.Done;
import akka.NotUsed;
import akka.actor.typed.ActorSystem;
import akka.actor.typed.javadsl.Behaviors;
import akka.stream.FlowShape;
import akka.stream.SinkShape;
import akka.stream.UniformFanOutShape;
import akka.stream.javadsl.Flow;
import akka.stream.javadsl.GraphDSL;
import akka.stream.javadsl.Partition;
import akka.stream.javadsl.Sink;
import akka.stream.javadsl.Source;
import akka.stream.javadsl.Tcp;
import akka.stream.javadsl.Tcp.IncomingConnection;
import akka.stream.javadsl.Tcp.ServerBinding;
import akka.util.ByteString;
public class SimpleStream03 {
private static int connectionCount = 0;
private static int maxConnectioCount = 2;
public static void runServer() {
ActorSystem actorSystem = ActorSystem.create(Behaviors.empty(), "actorSystem");
Source<IncomingConnection, CompletionStage<ServerBinding>> source = Tcp.get(actorSystem).bind("127.0.0.1",
8888);
Sink<IncomingConnection, CompletionStage<Done>> handler = Sink.foreach(conn -> {
System.out.println("Handler Sink Connection Count " + connectionCount);
System.out.println("Handler Sink Client connected from: " + conn.remoteAddress());
conn.handleWith(Flow.of(ByteString.class), actorSystem);
});
Flow<IncomingConnection, IncomingConnection, NotUsed> connectioncountFlow = Flow
.fromGraph(GraphDSL.create(builder -> {
SinkShape<IncomingConnection> sinkCancelled = builder.add(Sink.cancelled());
FlowShape<IncomingConnection, IncomingConnection> inFlowShape = builder
.add(Flow.of(IncomingConnection.class).map(conn -> {
connectionCount++;
return conn;
}));
UniformFanOutShape<IncomingConnection, IncomingConnection> partition = builder
.add(Partition.create(IncomingConnection.class, 2, param -> {
if (connectionCount > maxConnectioCount) {
connectionCount = maxConnectioCount;
System.out.println("Outlet 0 -> Sink.cancelled");
return 0;
}
System.out.println("Outlet 1 -> forward to handler");
return 1;
}));
builder.from(inFlowShape).toFanOut(partition);
builder.from(partition.out(0)).to(sinkCancelled);
return new FlowShape<>(inFlowShape.in(), partition.out(1));
}));
CompletionStage<ServerBinding> bindingFuture = source.via(connectioncountFlow).to(handler).run(actorSystem);
bindingFuture.handle((binding, throwable) -> {
if (binding != null) {
System.out.println("Server started, listening on: " + binding.localAddress());
} else {
System.err.println("Server could not bind to : " + throwable.getMessage());
actorSystem.terminate();
}
return NotUsed.getInstance();
});
}
public static void main(String[] args) throws InterruptedException {
SimpleStream03.runServer();
}
}
输出确认分区正常工作,并且 2 个连接正在访问主接收器处理程序。
Server started, listening on: /127.0.0.1:8888
Outlet 1 -> forward to handler
Handler Sink Connection Count 1
Handler Sink Client connected from: /127.0.0.1:60327
Outlet 1 -> forward to handler
Handler Sink Connection Count 2
Handler Sink Client connected from: /127.0.0.1:60330
Outlet 0 -> Sink.cancelled
编辑:实现接受的答案,以下更改可防止在超出阈值后传入连接。客户端看到对等方重置的连接
+---------------------------------------------------------+
| |
| Fail Flow |
| +-------------+ |
| +-->+Sink |Source| |
| | |cancel|fail | |
| | +-------------+ |
| +----------+ | |
| | | | |
| +----------+ | O0+--+ |
connections|FLOW | | | O0:count > threshold |
+-------+--> +------->+ Partition| |
| |count++ | | O1+----------------------------->
| +----------+ | | |
| | | O1:count <= threshold|
| +----------+ |
| |
+---------------------------------------------------------+
取代
SinkShape<IncomingConnection> sinkCancelled = builder.add(Sink.cancelled());
跟
Sink<IncomingConnection, CompletionStage<Done>> connectionCancellingSink = Sink.foreach(ic -> ic
.handleWith(Flow.fromSinkAndSource(Sink.cancelled(), Source.failed(new Throwable("killed"))),
actorSystem));// Sink.ignore and Sink.cancel give me the same expected result
SinkShape<IncomingConnection> sinkCancelledShape = builder.add(connectionCancellingSink);
Sink.cancelled()
立即取消上游 (https://doc.akka.io/docs/akka/current/stream/operators/Sink/cancelled.html)。
但是,您的Partition
是在 eagerCancel 设置为false
的情况下创建的
/**
* Create a new `Partition` operator with the specified input type, `eagerCancel` is `false`.
*
* @param clazz a type hint for this method
* @param outputCount number of output ports
* @param partitioner function deciding which output each element will be targeted
*/
def create[T](
@unused clazz: Class[T],
outputCount: Int,
partitioner: function.Function[T, Integer]): Graph[UniformFanOutShape[T, T], NotUsed] =
new scaladsl.Partition(outputCount, partitioner.apply, eagerCancel = false)
这意味着Partition
只有在其所有下游连接取消时才会取消。这不是你想要的。但是你也不想eagerCancel=true
,因为这意味着第一个超过限制的连接将破坏整个Partition
,从而破坏你的所有连接。基本上是破坏整个服务器。
也许从嵌套流的角度考虑这里的情况很有用。顶级Source<IncomingConnection>
表示接受的 TCP 连接流。您不想取消该流。如果你这样做,你只是杀死了你的服务器。每个IncomingConnection
代表一个单独的 TCP 连接。在这种连接上发生的字节交换也表示为流。对于超过阈值的每个连接,您要取消的正是此流。
为此,您可以定义一个连接取消Sink
如下所示:
Sink<IncomingConnection, CompletionStage<Done>>
connectionCancellingSink =
Sink.foreach(
ic ->
ic.handleWith(
Flow.fromSinkAndSource(Sink.cancelled(), Source.empty()),
actorSystem));
IncomingConnection
允许您使用handleWith
方法附加处理程序。为此,您需要一个Flow
,因为您既消耗来自客户端的字节,也可能将字节发送到客户端(传入的字节进入Flow
以及您想要发送回客户端的任何内容,您需要在Flow
的输出上生成)。在我们的例子中,我们只想立即取消该流。您可以使用Flow.fromSinkAndSource
来获得Flow
...Sink
和Source
.您可以利用它来插件Sink.cancelled
和Source.empty
。因此,Source.empty
意味着我们不会向连接发送任何字节,Sink.cancelled
将立即取消流,并希望取消底层TCP连接。让我们试一试。
最后要做的是将我们新的取消Sink
插入Partition
SinkShape<IncomingConnection> sinkCancelled =
builder.add(connectionCancellingSink);
//...the rest stays the same
builder.from(partition.out(0))
.to(sinkCancelled);
如果这样做,在第三个连接上,您将看到以下消息:
Not aborting connection from 127.0.0.1:49874 because downstream cancelled stream without failure
所以Sink.cancelled()
并没有真正触发你想要的东西。让我们重新定义我们的取消Flow
:
Sink<IncomingConnection, CompletionStage<Done>>
connectionCancellingSink =
Sink.foreach(
ic ->
ic.handleWith(
Flow.fromSinkAndSource(Sink.ignore(),
Source.failed(new Throwable("killed"))),
actorSystem));
现在,这是使用Sink.ignore()
来忽略传入的字节,但通过Source.failed(...)
使流失败。这将导致连接立即终止,并且跟踪跟踪将打印在服务器输出上。如果你想保持安静,你可以在没有堆栈跟踪的情况下创建异常:
public static class TerminatedException extends Exception
{
public TerminatedException(String message)
{
super(message, null, false, false);
}
}
然后使用它使连接流失败
Flow.fromSinkAndSource(Sink.ignore(),
Source.failed(
new TerminatedException(("killed"))))
这样你就会得到更干净的日志。