可观察到的序列Rx C#的多个订阅



我试图在.NET中学习RX框架,并在应用它的过程中是我正在从事的任务之一。在下面的这个简单用例中,我尝试的功能都在说明。

我有一系列iEnumerable,我需要并行处理,并将函数(异步函数(应用于返回int。基于返回的值。在下面的代码中,仅在创建原始"序列"之后创建了新序列和均匀序列,因此这里没有真正的"火与忘记"。什么是使用RX解决此问题的正确方法?

    public void TestRx()
    {
        IEnumerable<string> inputs = new string[] { "T-1", "T-2", "T-3"};
        var sequence = inputs.ToObservable().Select(x => ReturnResult(x)).Merge();
        var oddSequence = sequence.Where(x => (x%2) != 0);
        var evenSequence = sequence.Where(x => (x%2) == 0);
        oddSequence.Subscribe(i => Console.WriteLine($"ODD VALUE {i}"));
        evenSequence.Subscribe(i => Console.WriteLine($"EVEN VALUE {i}"));
    }
    public async Task<int> ReturnResult(string s)
    {
        int result = -1;
        Int32.TryParse(s.Split('-')[1], out result);
        return result;
    }

谢谢你帮助我

这将完成工作:

private Random random = new Random();
public void TextRx()
{
    new[] {"T-1", "T-2", "T-3"}
        .Select(s => ReturnResult(s).ToObservable())
        .Merge()
        .GroupBy(i => i % 2 == 0)
        .SelectMany(g => g.Select(i => (g.Key, Value: i)))
        .Subscribe(o => Console.WriteLine($"{(o.Key ? "EVEN" : "ODD")} VALUE {o.Value}"));
}
public async Task<int> ReturnResult(string s)
{
    await Task.Delay(random.Next(5000));
    int.TryParse(s.Split('-')[1], out var result);
    return result;
}
  • 它使用ValueTuple,这是C#7功能。您需要用Nuget抓住它。当然,您可以使用匿名对象,但是我更喜欢ValueTuple。它还使用C#7中的out变量和C#6的字符串插值。

相关内容

  • 没有找到相关文章

最新更新