RX -使用自定义的IEqualityComparer时出现明显错误



我有一个基于轮询的协议,我想利用RX将其转换为基于推送的协议。每隔x秒,我使用协议请求标记(名称和值),并从中获得它们。

我只对改变标签中的值感兴趣,所以我使用DistinctUntilChanges函数。

this.TagsChangeNotifier = _tags
    .Select(tag => 
    { 
        return Observable
            .Interval(ts)
            .Select(_ => { return tag; })
            .DistinctUntilChanged(new DataTagComparer()); 
    })
    .Merge();

这里是DataTagcomparer类

public class DataTagComparer : IEqualityComparer<DataTag>
{
    public bool Equals(DataTag x, DataTag y)
    {
            b = y.WeakRawValue.ToByteArray().SequenceEqual(x.WeakRawValue.ToByteArray());
        return b;
    }
    public int GetHashCode(DataTag obj)
    {
        return obj.Name.GetHashCode();
    }
}

但是看起来无法工作,因为我永远看不到两个不同值之间的比较。下面是一个例子。

Start program: DataTag("Test",1)
Equals called: x = ("Test",1), y = ("Test",1)

等待10秒,从协议更改为返回2而不是1。

Equals called: x = ("Test",1), y = ("Test",1)
Equals called: x = ("Test",2), y = ("Test",2)
Equals called: x = ("Test",2), y = ("Test",2)
Equals called: x = ("Test",2), y = ("Test",2)

等等

奇怪的是,它完全没有比较之前和当前的值!你知道可能是什么问题吗?实际上,我正在使用这个糟糕的变通方法

public class DataTagComparer : IEqualityComparer<DataTag>
{
    private object val;
    public bool Equals(DataTag x, DataTag y)
    {
        bool b = true;
        if (val != null)
            b = val.ToByteArray().SequenceEqual(x.WeakRawValue.ToByteArray());
        val = x.WeakRawValue;
        return b;
    }
    public int GetHashCode(DataTag obj)
    {
        return obj.Name.GetHashCode();
    }
}

谢谢你的关注,Vincenzo .

编辑:DataTag类代码
public abstract class DataTag
{
    public DataTag(string _Name, string Desc)
    {
        Name = _Name;
        Description = Desc;
    }
    public string Name { get; private set; }
    public string Description { get; private set; }
    public abstract object WeakValue { get; }
    public abstract object WeakRawValue { get; }
}

编辑:标签更新功能

this.timerHandle = Observable.Interval(ts).Select(_ => { Update(); return _; }).Publish().Connect();

这....根据你的描述,看起来不太对——尽管我可能误解了你的意思……

this.TagsChangeNotifier = _tags
    // for each tag value in tags...
    .Select(tag => 
    { 
        // Tick off every TimeSpan ts, then...
        return Observable.Interval(ts)
            // Say we've "ticked"
            .Do(tick => Console.WriteLine("It's time to tick!"))
            // Return the value "tag" (which remains constant...)
            .Select(_ => { return tag; })
            // Say what we see
            .Do(t => Console.WriteLine("I see a {0}!", t))
            // But only when it's different from the last one
            // (but we never change the value?)
            .DistinctUntilChanged(new DataTagComparer()); 
    })    
    // And mash them all together into one stream
    .Merge();

我想这一切都取决于_tags是什么,在某种程度上DataTag的定义是什么,但我不认为这是你真正想要的。

编辑:

让我们画出流程-从_tags开始,目前我假设它是IObservable:

Time   _tags
  |    tag1
  |    tag2
  |    tag3

到目前为止,一切顺利-现在,我们为每个Select创建一个Interval:

Time   _tags
  |    tag1
  |     ---- Interval
  |
  |    tag2
  |     ---- Interval
  |
  |    tag3
  |     ---- Interval
  |

我们打勾一段时间,每次重新选择标签:

Time   _tags
  |    tag1
  |     ---- Interval
  |            -------Tick -> tag1
  |            -------Tick -> tag1
  |            -------Tick -> tag1
  |    tag2
  |     ---- Interval
  |            -------Tick -> tag2
  |            -------Tick -> tag2
  |            -------Tick -> tag2
  |    tag3
  |     ---- Interval
  |            -------Tick -> tag3
  |            -------Tick -> tag3
  |            -------Tick -> tag3
  |

然后添加DistinctUntilChanged:

Time   _tags
  |    tag1
  |     ---- Interval
  |            -------Tick -> tag1 ---> tag1
  |            -------Tick -> tag1 -X
  |            -------Tick -> tag1 -X
  |    tag2
  |     ---- Interval
  |            -------Tick -> tag2 ---> tag2
  |            -------Tick -> tag2 -X
  |            -------Tick -> tag2 -X
  |    tag3
  |     ---- Interval
  |            -------Tick -> tag3 ---> tag3
  |            -------Tick -> tag3 -X
  |            -------Tick -> tag3 -X
  |

最后我们Merge子流:

Time   _tags                                      Output
  |    tag1                                         |
  |     ---- Interval                              |
  |            -------Tick -> tag1 ---> tag1       tag1
  |            -------Tick -> tag1 -X              |
  |            -------Tick -> tag1 -X              |
  |    tag2                                         |
  |     ---- Interval                              |
  |            -------Tick -> tag2 ---> tag2       tag2
  |            -------Tick -> tag2 -X              |
  |            -------Tick -> tag2 -X              |
  |    tag3                                         |
  |     ---- Interval                              |
  |            -------Tick -> tag3 ---> tag3       tag3
  |            -------Tick -> tag3 -X              |
  |            -------Tick -> tag3 -X              |
  |

所以,如果你所需要做的就是捕捉值流的变化,你可以尝试这样做:

// my fake source of "tags", in this case simple strings
var subject = new Subject<string>();
var source = subject.Publish().RefCount();
// Still want to track "distinct chains"
var distincts = source.DistinctUntilChanged();
// But we also want to "look into the future", and see the *next* distinct chain
var futureDistincts = source.DistinctUntilChanged().Skip(1);
// A "delta" occurs when a distinct chain ends, so we'll zip the two
// sequences together (since they are "now distinct" and "now + 1", this will mean changes)
var onlyDeltas = distincts
    .Zip(futureDistincts, (before,after) => Tuple.Create(before,after));
using(onlyDeltas.Subscribe(Console.WriteLine))
{
    subject.OnNext("Foo");
    subject.OnNext("Foo");
    subject.OnNext("Foo");
    subject.OnNext("Bar");  // BAM: triggers an output value of (Foo, Bar)
    subject.OnNext("Bar");
    subject.OnNext("Foo"); // BAM: triggers an output value of (Bar, Foo)
}

相关内容

  • 没有找到相关文章

最新更新