命令反应性扩展事件



我在多个线程中通过UDP接收消息。每次接收后,我提出MessageReceived.OnNext(message)

因为我使用了多个线程,所以引发的消息是无序的,这是一个问题。

我怎样才能通过信息计数器命令提升信息?(假设有一个message.counter属性)

必须记住,一条消息可能会在通信中丢失(比如说,如果我们在X条消息后有一个计数器孔,该孔没有被填满,我会提出下一条消息)

必须尽快发出消息(如果收到下一个计数器)

在声明检测丢失消息的要求时,您没有考虑最后一条消息未到达的可能性;我添加了一个timeoutDuration,如果在给定时间内没有任何消息到达,它会刷新缓冲的消息——您可能希望将其视为错误,请参阅注释了解如何执行此操作。

我将通过定义一个具有以下签名的扩展方法来解决这个问题:

public static IObservable<TSource> Sort<TSource>(
    this IObservable<TSource> source,
    Func<TSource, int> keySelector,
    TimeSpan timeoutDuration = new TimeSpan(),
    int gapTolerance = 0)
  • source是未排序的消息流
  • keySelector是从消息中提取int密钥的函数。我假设所寻求的第一个密钥是0;必要时进行修改
  • timeoutDuration如上所述,如果省略,则不存在超时
  • tolerance是在等待无序消息时保留的最大消息数。传递0以保存任意数量的消息
  • scheduler是用于超时的调度程序,它是为测试目的而提供的,如果没有给定,则使用默认值

演练

我将在这里进行逐行演练。下文将重复全面实施。

指定默认计划程序

首先,如果没有提供,我们必须分配一个默认的调度器:

scheduler = scheduler ?? Scheduler.Default;

排列超时

现在,如果请求超时,我们将用一个副本替换源,如果消息没有到达timeoutDuration,该副本将简单地终止并发送OnCompleted

if(timeoutDuration != TimeSpan.Zero)
    source = source.Timeout(
        timeoutDuration,
        Observable.Empty<TSource>(),
        scheduler);

如果您希望发送TimeoutException,只需删除Timeout的第二个参数——空流,即可选择一个执行此操作的重载。请注意,我们可以安全地与所有订户共享,因此它位于对Observable.Create的呼叫之外。

创建订阅处理程序

我们使用Observable.Create来构建我们的流。无论何时发生订阅,都会调用作为Create参数的lambda函数,并向我们传递调用观测器(o)。Create返回我们的IObservable<T>,所以我们在这里返回它。

return Observable.Create<TSource>(o => { ...

初始化一些变量

我们将跟踪nextKey中的下一个预期键值,并创建一个SortedDictionary来保存无序消息,直到它们可以发送为止。

int nextKey = 0;  
var buffer = new SortedDictionary<int, TSource>();

订阅源并处理消息

现在我们可以订阅消息流(可能会应用超时)。首先,我们介绍了OnNext处理程序。下一条消息分配给x:

return source.Subscribe(x => { ...

我们调用keySelector函数从消息中提取密钥:

var key = keySelector(x);

如果消息有一个旧密钥(因为它超过了我们对无序消息的容忍度),我们只需要删除它并处理此消息(您可能需要采取不同的行动):

// drop stale keys
if(key < nextKey) return;

否则,我们可能会有预期的密钥,在这种情况下,我们可以增加nextKey发送消息:

if(key == nextKey)
{
    nextKey++;
    o.OnNext(x);                    
}

或者,我们可能有一个无序的未来消息,在这种情况下,我们必须将其添加到缓冲区中。如果我们这样做,我们还必须确保我们的缓冲区没有超过我们对存储无序消息的容忍度-在这种情况下,我们还将把nextKey添加到缓冲区中的第一个密钥,因为它是SortedDictionary,所以很方便地是下一个最低的密钥:

else if(key > nextKey)
{
    buffer.Add(key, x);
    if(gapTolerance != 0 && buffer.Count > gapTolerance)
        nextKey = buffer.First().Key;
}

现在,不管上面的结果如何,我们都需要清空现在准备就绪的任何密钥的缓冲区。为此,我们使用了一个helper方法。请注意,它会调整nextKey,因此我们必须小心通过引用传递它。我们只需在缓冲区上循环读取、删除和发送消息,只要密钥相互跟随,每次递增nextKey

private static void SendNextConsecutiveKeys<TSource>(
    ref int nextKey,
    IObserver<TSource> observer,
    SortedDictionary<int, TSource> buffer)
{
    TSource x;
    while(buffer.TryGetValue(nextKey, out x))
    {
        buffer.Remove(nextKey);
        nextKey++;
        observer.OnNext(x);                        
    }
}

处理错误

接下来,我们提供一个OnError处理程序——如果您选择这样做,它将通过任何错误,包括Timeout异常。

冲洗缓冲器

最后,我们必须处理OnCompleted。在这里,我选择清空缓冲区——如果一条无序的消息占用了消息,并且从未到达,这将是必要的。这就是我们需要超时的原因:

() => {
    // empty buffer on completion
    foreach(var item in buffer)
        o.OnNext(item.Value);                
    o.OnCompleted();
});

全面实施

以下是完整的实施。

public static IObservable<TSource> Sort<TSource>(
    this IObservable<TSource> source,
    Func<TSource, int> keySelector,
    int gapTolerance = 0,
    TimeSpan timeoutDuration = new TimeSpan(),
    IScheduler scheduler = null)
{       
    scheduler = scheduler ?? Scheduler.Default;
    if(timeoutDuration != TimeSpan.Zero)
        source = source.Timeout(
            timeoutDuration,
            Observable.Empty<TSource>(),
            scheduler);
    return Observable.Create<TSource>(o => {
        int nextKey = 0;  
        var buffer = new SortedDictionary<int, TSource>();
        return source.Subscribe(x => {
            var key = keySelector(x);
            // drop stale keys
            if(key < nextKey) return;
            if(key == nextKey)
            {
                nextKey++;
                o.OnNext(x);                    
            }
            else if(key > nextKey)
            {
                buffer.Add(key, x);
                if(gapTolerance != 0 && buffer.Count > gapTolerance)
                    nextKey = buffer.First().Key;
            }
            SendNextConsecutiveKeys(ref nextKey, o, buffer);
        },
        o.OnError,
        () => {
            // empty buffer on completion
            foreach(var item in buffer)
                o.OnNext(item.Value);                
            o.OnCompleted();
        });
    });
}
private static void SendNextConsecutiveKeys<TSource>(
    ref int nextKey,
    IObserver<TSource> observer,
    SortedDictionary<int, TSource> buffer)
{
    TSource x;
    while(buffer.TryGetValue(nextKey, out x))
    {
        buffer.Remove(nextKey);
        nextKey++;
        observer.OnNext(x);                        
    }
}

测试线束

如果您在控制台应用程序中包含nuget rx-testing,则会运行以下程序,为您提供一个可供使用的测试工具:

public static void Main()
{
    var tests = new Tests();
    tests.Test();
}
public class Tests : ReactiveTest
{
    public void Test()
    {
        var scheduler = new TestScheduler();
        var xs = scheduler.CreateColdObservable(
            OnNext(100, 0),
            OnNext(200, 2),
            OnNext(300, 1),
            OnNext(400, 4),
            OnNext(500, 5),
            OnNext(600, 3),
            OnNext(700, 7),
            OnNext(800, 8),
            OnNext(900, 9),            
            OnNext(1000, 6),
            OnNext(1100, 12),
            OnCompleted(1200, 0));
        //var results = scheduler.CreateObserver<int>();
        xs.Sort(
            keySelector: x => x,
            gapTolerance: 2,
            timeoutDuration: TimeSpan.FromTicks(200),
            scheduler: scheduler).Subscribe(Console.WriteLine);
        scheduler.Start();
    }
}

结束语

这里有各种有趣的替代方法。我选择了这种基本上势在必行的方法,因为我认为这是最容易遵循的方法,但你可能会用一些花哨的分组恶作剧来做到这一点。我知道Rx有一点是一贯正确的,那就是总是有很多方法可以剥猫皮!

我对这里的超时想法也不太满意——在生产系统中,我想实现一些检查连接的方法,比如心跳或类似的方法。我没有涉及这一点,因为很明显,它将是特定于应用程序的。此外,心跳也曾在这些论坛和其他地方讨论过(比如在我的博客上)。

如果你想要可靠的排序,强烈考虑使用TCP,这就是它的用途;否则,你将被迫用UDP玩猜谜游戏,有时你会错的。

例如,假设您按以下顺序接收以下数据报:[A,B,D]

当你收到D时,你应该等C到达多久才能推D?

无论你选择什么持续时间,你都可能错了:

  1. 如果C在传输过程中丢失,因此永远不会到达,该怎么办
  2. 如果你选择的持续时间太短,你最终推了D,但却收到了C,该怎么办

也许你可以选择一个启发式效果最好的持续时间,但为什么不直接使用TCP呢?

旁注:

MessageReceived.OnNext意味着您正在使用Subject<T>,这可能是不必要的。考虑将异步UdpClient方法直接转换为可观测值,或者通过Observable.Create<T>(async (observer, cancel) => { ... })编写异步迭代器来转换它们。

相关内容

  • 没有找到相关文章

最新更新