我正在尝试用Rxx创建一个TCP时间服务器。这个想法只是一个Tcp服务器,每秒向每个连接的客户端广播服务器上的本地时间。我可以连接到这个服务器,我看到'ticks'可观察对象被订阅,但客户端没有接收任何数据。我遗漏了什么?这是我为服务器准备的代码。
class Program
{
static void Main(string[] args)
{
var ticks = Observable.Interval(TimeSpan.FromSeconds(1))
.Select(_ => DateTime.Now.ToString())
.Do(tick => Console.WriteLine("tick: {0}", tick))
.Publish()
.RefCount();
IPEndPoint serverAddress = new IPEndPoint(IPAddress.Loopback, 15007);
var listener = ObservableSocket.Accept(
AddressFamily.InterNetwork,
SocketType.Stream,
ProtocolType.Tcp,
serverAddress,
20)
.Do(s => Console.WriteLine("connection accepted {0}", s.RemoteEndPoint))
.Select(s => new StreamWriter(new NetworkStream(s, true)));
using (listener.Subscribe(
client => ticks.Subscribe(
tick => client.WriteLineAsync(tick),
(tex) => Console.WriteLine("ticks error: {0}", tex.Message),
() => Console.WriteLine("ticks completed")
),
(ex) => Console.WriteLine("server error: {0}", ex.Message),
() => Console.WriteLine("server completed")
)
)
{
Console.WriteLine("Time server listening {0}", serverAddress);
Console.WriteLine("Press ENTER to stop...");
Console.ReadLine();
}
}
}
这是一个非基于Rxx的解决方案,我得到了工作,但它不是很漂亮。希望Rxx大师还能给我一个更优雅的解决方案。
class Program
{
static List<TcpClient> clients = new List<TcpClient>();
static void Main(string[] args)
{
IPEndPoint serverAddress = new IPEndPoint(IPAddress.Loopback, 15007);
using(Listen(serverAddress).Subscribe(client => { lock(clients) { clients.Add(client); }}))
using (Ticks().Subscribe(
tick =>
{
lock(clients)
{
int i = 0;
while (i < clients.Count)
{
var client = clients[i];
try
{
using(var writer = new StreamWriter(client.GetStream(), Encoding.ASCII, client.Client.SendBufferSize, true))
{
writer.WriteLine(tick.ToString());
i++;
}
}
catch(Exception ex)
{
Console.WriteLine("exception: {0}", ex.Message);
clients.Remove(client);
Console.WriteLine("client disconnected");
}
}
}
})
)
{
Console.WriteLine("Time server listening {0}", serverAddress);
Console.WriteLine("Press ENTER to stop...");
Console.ReadLine();
}
}
static IObservable<TcpClient> Listen(IPEndPoint endpoint)
{
return Observable.Create<TcpClient>(
observer =>
{
TcpListener listener = new TcpListener(endpoint);
listener.Start();
var subscription = Observable
.FromAsync(listener.AcceptTcpClientAsync)
.Retry()
.Repeat()
.Do(client => Console.WriteLine("connection accepted {0}", client.Client.RemoteEndPoint))
.Subscribe(observer);
return new CompositeDisposable(subscription,
Disposable.Create(() => listener.Stop()));
})
.Publish()
.RefCount();
}
static IObservable<DateTime> Ticks()
{
return Observable.Interval(TimeSpan.FromSeconds(1))
.Select(_ => DateTime.Now)
.Do(tick => Console.WriteLine("tick: {0}", tick))
.Publish()
.RefCount();
}
}