我正在学习RX,想使用控制台。ReadLine作为可观察序列的源。
我知道我可以使用"yield return"来创建"IEnumerable",但是对于我的具体用例,我决定创建一个c#事件,这样潜在的许多观察者将能够共享相同的键盘输入。
下面是我的代码:class Program
{
private delegate void OnNewInputLineHandler(string line);
private static event OnNewInputLineHandler OnNewInputLineEvent = _ => {};
static void Main(string[] args)
{
Task.Run((Action) GetInput);
var input = ConsoleInput();
input.Subscribe(s=>Console.WriteLine("1: " + s));
Thread.Sleep(30000);
}
private static void GetInput()
{
while (true)
OnNewInputLineEvent(Console.ReadLine());
}
private static IObservable<string> ConsoleInput()
{
return Observable.Create<string>(
(IObserver<string> observer) =>
{
OnNewInputLineHandler h = observer.OnNext;
OnNewInputLineEvent += h;
return Disposable.Create(() => { OnNewInputLineEvent -= h; });
});
}
}
我的问题-当我像上面所示的那样运行GetInput方法时,第一个输入行没有被发送到序列(但它被发送到事件处理程序)。
但是,如果我用下面的版本替换它,一切都像预期的那样工作:
private static void GetInput()
{
while (true)
{
var s = Console.ReadLine();
OnNewInputLineEvent(s);
}
}
有人能解释一下为什么会发生这种情况吗?
你这是在自找麻烦。几乎总有一种方法可以使Rx变得简单。这只是一个学会从功能上思考而不是过程上思考的问题。
这就是你所需要的:
class Program
{
static void Main(string[] args)
{
var subscription = ConsoleInput().Subscribe(s => Console.WriteLine("1: " + s));
Thread.Sleep(30000);
subscription.Dispose();
}
private static IObservable<string> ConsoleInput()
{
return
Observable
.FromAsync(() => Console.In.ReadLineAsync())
.Repeat()
.Publish()
.RefCount()
.SubscribeOn(Scheduler.Default);
}
}
这允许多个订阅者通过.Publish().RefCount()
共享一个输入。.SubscribeOn(Scheduler.Default)
将订阅推送到一个新线程-没有它,您将阻塞订阅。
如果您将Task.Run((Action) GetInput);
移动到订阅后,您的代码将按预期工作。这是因为在您的原始版本中,OnNewInputEvent(Console.ReadLine())
的第一次调用是在您将OnNewInputLineEvent挂接到observer.OnNext
之前运行的。