Tcp时间服务器使用Rxx



我正在尝试用Rxx创建一个TCP时间服务器。这个想法只是一个Tcp服务器,每秒向每个连接的客户端广播服务器上的本地时间。我可以连接到这个服务器,我看到'ticks'可观察对象被订阅,但客户端没有接收任何数据。我遗漏了什么?这是我为服务器准备的代码。

class Program
{
    static void Main(string[] args)
    {
        var ticks = Observable.Interval(TimeSpan.FromSeconds(1))
            .Select(_ => DateTime.Now.ToString())
            .Do(tick => Console.WriteLine("tick: {0}", tick))
            .Publish()
            .RefCount();
        IPEndPoint serverAddress = new IPEndPoint(IPAddress.Loopback, 15007);
        var listener = ObservableSocket.Accept(
                AddressFamily.InterNetwork,
                SocketType.Stream,
                ProtocolType.Tcp,
                serverAddress,
                20)
                .Do(s => Console.WriteLine("connection accepted {0}", s.RemoteEndPoint))
                .Select(s => new StreamWriter(new NetworkStream(s, true)));

        using (listener.Subscribe(
                client => ticks.Subscribe(
                    tick => client.WriteLineAsync(tick),
                    (tex) => Console.WriteLine("ticks error: {0}", tex.Message),
                    () => Console.WriteLine("ticks completed")
                ), 
                (ex) => Console.WriteLine("server error: {0}", ex.Message), 
                () => Console.WriteLine("server completed")
            )
        )
        {
            Console.WriteLine("Time server listening {0}", serverAddress);
            Console.WriteLine("Press ENTER to stop...");
            Console.ReadLine();
        }
    }
}

这是一个非基于Rxx的解决方案,我得到了工作,但它不是很漂亮。希望Rxx大师还能给我一个更优雅的解决方案。

class Program
{
    static List<TcpClient> clients = new List<TcpClient>();
    static void Main(string[] args)
    {
        IPEndPoint serverAddress = new IPEndPoint(IPAddress.Loopback, 15007);
        using(Listen(serverAddress).Subscribe(client => { lock(clients) { clients.Add(client); }}))
        using (Ticks().Subscribe(
            tick => 
            {
                lock(clients)
                {
                    int i = 0;
                    while (i < clients.Count)
                    {
                        var client = clients[i];
                        try
                        {
                            using(var writer = new StreamWriter(client.GetStream(), Encoding.ASCII, client.Client.SendBufferSize, true))
                            {
                                writer.WriteLine(tick.ToString());
                                i++;
                            }
                        }
                        catch(Exception ex)
                        {
                            Console.WriteLine("exception: {0}", ex.Message);
                            clients.Remove(client);
                            Console.WriteLine("client disconnected");
                        }
                    }
                }
            })
        )
        {
            Console.WriteLine("Time server listening {0}", serverAddress);
            Console.WriteLine("Press ENTER to stop...");
            Console.ReadLine();
        }
    }
    static IObservable<TcpClient> Listen(IPEndPoint endpoint)
    {
        return Observable.Create<TcpClient>(
            observer =>
            {
                TcpListener listener = new TcpListener(endpoint);
                listener.Start();
                var subscription = Observable
                    .FromAsync(listener.AcceptTcpClientAsync)
                    .Retry()
                    .Repeat()
                    .Do(client => Console.WriteLine("connection accepted {0}", client.Client.RemoteEndPoint))
                    .Subscribe(observer);
                return new CompositeDisposable(subscription, 
                    Disposable.Create(() => listener.Stop()));
            })
            .Publish()
            .RefCount();
    }
    static IObservable<DateTime> Ticks()
    {
        return Observable.Interval(TimeSpan.FromSeconds(1))
            .Select(_ => DateTime.Now)
            .Do(tick => Console.WriteLine("tick: {0}", tick))
            .Publish()
            .RefCount();
    }
}

相关内容

  • 没有找到相关文章