将Console.ReadLine()传递给c#事件



我正在学习RX,想使用控制台。ReadLine作为可观察序列的源。

我知道我可以使用"yield return"来创建"IEnumerable",但是对于我的具体用例,我决定创建一个c#事件,这样潜在的许多观察者将能够共享相同的键盘输入。

下面是我的代码:
class Program
{
    private delegate void OnNewInputLineHandler(string line);
    private static event OnNewInputLineHandler OnNewInputLineEvent = _ => {};
    static void Main(string[] args)
    {
        Task.Run((Action) GetInput);
        var input = ConsoleInput();
        input.Subscribe(s=>Console.WriteLine("1: " + s));
        Thread.Sleep(30000);
    }
    private static void GetInput()
    {
        while (true)
            OnNewInputLineEvent(Console.ReadLine());
    }
    private static IObservable<string> ConsoleInput()
    {
        return Observable.Create<string>(
        (IObserver<string> observer) =>
        {
            OnNewInputLineHandler h = observer.OnNext;
            OnNewInputLineEvent += h;
            return Disposable.Create(() => { OnNewInputLineEvent -= h; });
        });
    }
}

我的问题-当我像上面所示的那样运行GetInput方法时,第一个输入行没有被发送到序列(但它被发送到事件处理程序)。

但是,如果我用下面的版本替换它,一切都像预期的那样工作:

private static void GetInput()
{
    while (true)
    {
        var s = Console.ReadLine();
        OnNewInputLineEvent(s);
    }
}

有人能解释一下为什么会发生这种情况吗?

你这是在自找麻烦。几乎总有一种方法可以使Rx变得简单。这只是一个学会从功能上思考而不是过程上思考的问题。

这就是你所需要的:

class Program
{
    static void Main(string[] args)
    {
        var subscription = ConsoleInput().Subscribe(s => Console.WriteLine("1: " + s));
        Thread.Sleep(30000);
        subscription.Dispose();
    }
    private static IObservable<string> ConsoleInput()
    {
        return
            Observable
                .FromAsync(() => Console.In.ReadLineAsync())
                .Repeat()
                .Publish()
                .RefCount()
                .SubscribeOn(Scheduler.Default);
    }
}

这允许多个订阅者通过.Publish().RefCount()共享一个输入。.SubscribeOn(Scheduler.Default)将订阅推送到一个新线程-没有它,您将阻塞订阅。

如果您将Task.Run((Action) GetInput);移动到订阅后,您的代码将按预期工作。这是因为在您的原始版本中,OnNewInputEvent(Console.ReadLine())的第一次调用是在您将OnNewInputLineEvent挂接到observer.OnNext之前运行的。

相关内容

  • 没有找到相关文章

最新更新