将单个输入流的内容并发地写入多个输出流



我试图将单个输入流的内容推送到从tcp套接字获得的多个输出流。我没有找到任何现有的解决方案,所以我从零开始做了一些东西,但我真的觉得我在重新发明轮子。我的要求是:

  • Java。如果它能在Android上运行就好了,但这是可选的。
  • 最多10个客户端
  • 客户端可以频繁添加/删除
  • 每个客户端应该接收到大致相同的字节/秒
  • 当添加新客户端时,吞吐量应该尽可能少地下降
  • 解决方案不应该特定于某些数据(即,当我用原始h264测试的解决方案应该处理文本流一样好)

那么,首先,是否有一个库满足这些要求?

如果不是,我如何提高我自己的解决方案的性能(见下文)。我这样使用它:

    在应用程序启动时,我在一个新线程中运行这个类的实例。
  • 如果客户端连接,相应套接字的输出流附加到该实例

当它工作时,每个附加了N个客户机的客户机上的吞吐量非常不稳定。似乎没有关于n的吞吐量函数(免责声明:我从使用多个线程的单个客户端进行测试)。我认为这个解决方案的性能主要受到两个因素的影响:

  • 线程同步的消费者收集,添加/删除消费者将阻塞写入所有流。
  • 集合的迭代时间,因为iterator()可能每次都会创建一个新实例,但我需要remove()函数。

如果有任何建议,我将非常感谢。

public class InfiniteStreamingResource implements Runnable {
    private LinkedHashSet<OutputStream> consumers;
    private byte[] buf;
    private boolean running;
    InputStream stream;
    Logger logger = Logger.getLogger(InfiniteStreamingResource.class);
    public InfiniteStreamingResource(InputStream stream, int bufSize) {
        this.stream = stream
        consumers = new LinkedHashSet<>(100);
        buf = new byte[bufSize];
    }
    public synchronized void attachConsumer(OutputStream consumer) {
        consumers.add(consumer);
    }
    public synchronized void stop() {
        running = false;
    }
    @Override
    public void run() {
        running = true;
        int bytesRead;
        Iterator<OutputStream> it;
        OutputStream current;
        try {
            while ((bytesRead = stream.read(buf)) > 0 && running) {
                synchronized(this) {
                    it = consumers.iterator();
                    while (it.hasNext()) {
                        current = it.next();
                        try {
                            current.write(buf, 0, bytesRead);
                        } catch (IOException e) {
                            if (e instanceof SocketException) {
                                it.remove();
                                try {
                                    current.close();
                                } catch (IOException inner) {
                                    //ignore
                                }
                            } else {
                            e.printStackTrace();
                            }
                        }
                    }
                }
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
        for (OutputStream consumer : consumers) { //if stopped, close any remaining streams
            try {
                consumer.close();
            } catch (IOException e) {
                //ignore
            }
        }
    }

}

我会根据你想要发送的数据来决定。

如果它是原始字节数据(例如视频流),我不知道一个库。

在文本或Java对象的情况下使用RabbitMQ因此,您需要安装Broker。它可以接收来自发布者的消息并将其转发给所有订阅的客户端。

它很容易实现(在Android中也是如此)。

相关内容

  • 没有找到相关文章

最新更新