我是使用Rx的新手,我一直在尝试重写我的MVC/服务层(不是ASP!)来使用这个令人敬畏的新奇的Rx。我有一个类叫做Remote
,它封装了一个NetworkStream
。Remote
使用Rx来侦听来自NetworkStream
的字节,一旦它计算出它收到了完整的消息值数据,它将该数据解码为IMessage
。
我知道我如何从Remote
内部连续使用Rx从Stream
读取,但是我如何从Remote
将解码的IMessage
从该流发布到外部世界?我应该在c#中使用经典的事件风格,并让事件的消费者使用Observable.FromEvent
吗?
我只是问,因为我已经读到,IObservable
是不打算再实施了
我应该在c#中使用经典的事件样式并拥有事件的消费者使用
Observable.FromEvent
?
如果你没有被强制这样做,不要使用c#风格的事件来创建API 。IObservable<T>
是一个强大的、通用的、得到广泛支持的接口,它允许我们在轻松管理订阅的同时将事件视为一级公民。即使你的用户不使用Rx,他们也能够比使用c#事件更容易地理解和使用IObservable<T>
。他们如何处理这些事件取决于他们自己,但IObservable<T>
的抽象更清晰、更简单。
我听说IObservable不再被实现了。
实际上,我们的意思是可能没有理由自己实现IObservable<T>
,因为我们有工具为我们创建该类型的实例。
我们有Observable.Create(...)
,它允许我们从头开始创建可观察对象。我们有不同类型的主题,如Subject<T>
, BehaviorSubject<T>
, ReplaySubject<T>
等,它们可以用作代理,并允许我们向多个消费者多播值,我们有操作符,允许我们将任何IObservable<T>
转换/组合成另一种类型或类型的IObservable<T>
。
但是我如何从该流发布解码IMessage到外部世界从
Remote
?
在你的类/接口上公开一个IObservable<T>
。
public interface IRemote
{
public IObservable<IMessage> Messages { get; }
}
你可以用很多方法来实现它。首先,您可以这样做,以便每个对Messages
的订阅都获得它自己对底层逻辑的订阅…
public class Remote : IRemote
{
private IObservable<IMessage> _messages = ...;
public IObservable<IMessage> Message {
get {
return message;
}
}
}
或者你可以确保对底层逻辑只有一个订阅…
public class Remote : IRemote
{
private IObservable<IMessage> _messages = ...;
private IObservable<IMessage> _refCountedMessages
= this._messages
.Publish()
.RefCount();
public IObservable<IMessage> Message {
get {
return this._refCountedMessages;
}
}
}
或者您可以使连接过程在本质上非常显式。
public interface IRemote
{
public IObservable<IMessage> Messages { get; }
public IDisposable Connect();
}
public class Remote : IRemote
{
private IObservable<IMessage> _messages = ...;
private IObservable<IMessage> _connectableMessages
= this._messages
.Publish();
public IObservable<IMessage> Message {
get {
return this._connectableMessages;
}
}
public IDisposable Connect()
{
return this._connectableMessages.Connect();
}
}
我想你的问题类似于这个问题如何"重建线条"。使用Rx从串行端口读取的数据?
您将获得字节,而不是将字符串推送给您,然后将其更改为消息。没问题,您可以使用相同的WindowBy概念将字节序列分割成窗口,然后可以将其翻译/转换/映射/任何内容转换为IMessage
。
基于Christopher Harris的回答。下面是他提出的接口的实现。这里的要点是,您可以公开可观察序列,这些序列只是建立在底层可观察序列之上的查询。在这种情况下,消息序列只是网络序列上的查询。通过分层,我们得到了消费者想要的抽象层次。
void Main()
{
var networkStream = new NetworkStream();
var remote = new Remote(networkStream);
remote.GetMessages().Dump("remote.GetMessages()");
}
// Define other methods and classes here
public class NetworkStream
{
//Fake getting bytes off the wire or disk
public IObservable<byte> GetNetworkStream()
{
var text = @"Line 1.
Hello line 2.
3rd and final line!";
return Observable.Zip(
UTF8Encoding.UTF8.GetBytes(text).ToObservable(),
Observable.Interval(TimeSpan.FromMilliseconds(100)),
(character, time)=>character);
}
}
public interface IMessage
{
string Content {get;}
}
public class Message : IMessage
{
public Message(string content)
{
Content = content;
}
public string Content {get; private set;}
}
public interface IRemote
{
IObservable<IMessage> GetMessages();
}
public class Remote : IRemote
{
private readonly NetworkStream _networkStream;
private readonly byte[] _delimiter = UTF8Encoding.UTF8.GetBytes(Environment.NewLine);
public Remote(NetworkStream networkStream)
{
_networkStream = networkStream;
}
public IObservable<IMessage> GetMessages()
{
return _networkStream.GetNetworkStream()
.WindowByExclusive(b => _delimiter.Contains(b))
.SelectMany(window=>window.ToArray().Select(bytes=>UTF8Encoding.UTF8.GetString(bytes)))
.Select(content=>new Message(content));
}
//TODO Add IDispose and clean up your NetworkStream
}
public static class ObservableEx
{
public static IObservable<IObservable<T>> WindowByExclusive<T>(this IObservable<T> input, Func<T, bool> isWindowBoundary)
{
return Observable.Create<IObservable<T>>(o=>
{
var source = input.Publish().RefCount();
var left = source.Where(isWindowBoundary).Select(_=>Unit.Default).StartWith(Unit.Default);
return left.GroupJoin(
source.Where(c=>!isWindowBoundary(c)),
x=>source.Where(isWindowBoundary),
x=>Observable.Empty<Unit>(),
(_,window)=>window)
.Subscribe(o);
});
}
}