我试图将单个输入流的内容推送到从tcp套接字获得的多个输出流。我没有找到任何现有的解决方案,所以我从零开始做了一些东西,但我真的觉得我在重新发明轮子。我的要求是:
- Java。如果它能在Android上运行就好了,但这是可选的。
- 最多10个客户端
- 客户端可以频繁添加/删除
- 每个客户端应该接收到大致相同的字节/秒
- 当添加新客户端时,吞吐量应该尽可能少地下降
- 解决方案不应该特定于某些数据(即,当我用原始h264测试的解决方案应该处理文本流一样好)
那么,首先,是否有一个库满足这些要求?
如果不是,我如何提高我自己的解决方案的性能(见下文)。我这样使用它:
- 在应用程序启动时,我在一个新线程中运行这个类的实例。
- 如果客户端连接,相应套接字的输出流附加到该实例
当它工作时,每个附加了N个客户机的客户机上的吞吐量非常不稳定。似乎没有关于n的吞吐量函数(免责声明:我从使用多个线程的单个客户端进行测试)。我认为这个解决方案的性能主要受到两个因素的影响:
- 线程同步的消费者收集,添加/删除消费者将阻塞写入所有流。
- 集合的迭代时间,因为iterator()可能每次都会创建一个新实例,但我需要remove()函数。
如果有任何建议,我将非常感谢。
public class InfiniteStreamingResource implements Runnable {
private LinkedHashSet<OutputStream> consumers;
private byte[] buf;
private boolean running;
InputStream stream;
Logger logger = Logger.getLogger(InfiniteStreamingResource.class);
public InfiniteStreamingResource(InputStream stream, int bufSize) {
this.stream = stream
consumers = new LinkedHashSet<>(100);
buf = new byte[bufSize];
}
public synchronized void attachConsumer(OutputStream consumer) {
consumers.add(consumer);
}
public synchronized void stop() {
running = false;
}
@Override
public void run() {
running = true;
int bytesRead;
Iterator<OutputStream> it;
OutputStream current;
try {
while ((bytesRead = stream.read(buf)) > 0 && running) {
synchronized(this) {
it = consumers.iterator();
while (it.hasNext()) {
current = it.next();
try {
current.write(buf, 0, bytesRead);
} catch (IOException e) {
if (e instanceof SocketException) {
it.remove();
try {
current.close();
} catch (IOException inner) {
//ignore
}
} else {
e.printStackTrace();
}
}
}
}
}
} catch (IOException e) {
e.printStackTrace();
}
for (OutputStream consumer : consumers) { //if stopped, close any remaining streams
try {
consumer.close();
} catch (IOException e) {
//ignore
}
}
}
}
我会根据你想要发送的数据来决定。
如果它是原始字节数据(例如视频流),我不知道一个库。
在文本或Java对象的情况下使用RabbitMQ因此,您需要安装Broker。它可以接收来自发布者的消息并将其转发给所有订阅的客户端。
它很容易实现(在Android中也是如此)。