我有一个基于轮询的协议,我想利用RX将其转换为基于推送的协议。每隔x秒,我使用协议请求标记(名称和值),并从中获得它们。
我只对改变标签中的值感兴趣,所以我使用DistinctUntilChanges函数。
this.TagsChangeNotifier = _tags
.Select(tag =>
{
return Observable
.Interval(ts)
.Select(_ => { return tag; })
.DistinctUntilChanged(new DataTagComparer());
})
.Merge();
这里是DataTagcomparer类
public class DataTagComparer : IEqualityComparer<DataTag>
{
public bool Equals(DataTag x, DataTag y)
{
b = y.WeakRawValue.ToByteArray().SequenceEqual(x.WeakRawValue.ToByteArray());
return b;
}
public int GetHashCode(DataTag obj)
{
return obj.Name.GetHashCode();
}
}
但是看起来无法工作,因为我永远看不到两个不同值之间的比较。下面是一个例子。
Start program: DataTag("Test",1)
Equals called: x = ("Test",1), y = ("Test",1)
等待10秒,从协议更改为返回2而不是1。
Equals called: x = ("Test",1), y = ("Test",1)
Equals called: x = ("Test",2), y = ("Test",2)
Equals called: x = ("Test",2), y = ("Test",2)
Equals called: x = ("Test",2), y = ("Test",2)
等等
奇怪的是,它完全没有比较之前和当前的值!你知道可能是什么问题吗?实际上,我正在使用这个糟糕的变通方法
public class DataTagComparer : IEqualityComparer<DataTag>
{
private object val;
public bool Equals(DataTag x, DataTag y)
{
bool b = true;
if (val != null)
b = val.ToByteArray().SequenceEqual(x.WeakRawValue.ToByteArray());
val = x.WeakRawValue;
return b;
}
public int GetHashCode(DataTag obj)
{
return obj.Name.GetHashCode();
}
}
谢谢你的关注,Vincenzo .
编辑:DataTag类代码public abstract class DataTag
{
public DataTag(string _Name, string Desc)
{
Name = _Name;
Description = Desc;
}
public string Name { get; private set; }
public string Description { get; private set; }
public abstract object WeakValue { get; }
public abstract object WeakRawValue { get; }
}
编辑:标签更新功能
this.timerHandle = Observable.Interval(ts).Select(_ => { Update(); return _; }).Publish().Connect();
这....根据你的描述,看起来不太对——尽管我可能误解了你的意思……
this.TagsChangeNotifier = _tags
// for each tag value in tags...
.Select(tag =>
{
// Tick off every TimeSpan ts, then...
return Observable.Interval(ts)
// Say we've "ticked"
.Do(tick => Console.WriteLine("It's time to tick!"))
// Return the value "tag" (which remains constant...)
.Select(_ => { return tag; })
// Say what we see
.Do(t => Console.WriteLine("I see a {0}!", t))
// But only when it's different from the last one
// (but we never change the value?)
.DistinctUntilChanged(new DataTagComparer());
})
// And mash them all together into one stream
.Merge();
我想这一切都取决于_tags
是什么,在某种程度上DataTag
的定义是什么,但我不认为这是你真正想要的。
让我们画出流程-从_tags
开始,目前我假设它是IObservable
:
Time _tags
| tag1
| tag2
| tag3
到目前为止,一切顺利-现在,我们为每个Select
创建一个Interval
:
Time _tags
| tag1
| ---- Interval
|
| tag2
| ---- Interval
|
| tag3
| ---- Interval
|
我们打勾一段时间,每次重新选择标签:
Time _tags
| tag1
| ---- Interval
| -------Tick -> tag1
| -------Tick -> tag1
| -------Tick -> tag1
| tag2
| ---- Interval
| -------Tick -> tag2
| -------Tick -> tag2
| -------Tick -> tag2
| tag3
| ---- Interval
| -------Tick -> tag3
| -------Tick -> tag3
| -------Tick -> tag3
|
然后添加DistinctUntilChanged
:
Time _tags
| tag1
| ---- Interval
| -------Tick -> tag1 ---> tag1
| -------Tick -> tag1 -X
| -------Tick -> tag1 -X
| tag2
| ---- Interval
| -------Tick -> tag2 ---> tag2
| -------Tick -> tag2 -X
| -------Tick -> tag2 -X
| tag3
| ---- Interval
| -------Tick -> tag3 ---> tag3
| -------Tick -> tag3 -X
| -------Tick -> tag3 -X
|
最后我们Merge
子流:
Time _tags Output
| tag1 |
| ---- Interval |
| -------Tick -> tag1 ---> tag1 tag1
| -------Tick -> tag1 -X |
| -------Tick -> tag1 -X |
| tag2 |
| ---- Interval |
| -------Tick -> tag2 ---> tag2 tag2
| -------Tick -> tag2 -X |
| -------Tick -> tag2 -X |
| tag3 |
| ---- Interval |
| -------Tick -> tag3 ---> tag3 tag3
| -------Tick -> tag3 -X |
| -------Tick -> tag3 -X |
|
所以,如果你所需要做的就是捕捉值流的变化,你可以尝试这样做:
// my fake source of "tags", in this case simple strings
var subject = new Subject<string>();
var source = subject.Publish().RefCount();
// Still want to track "distinct chains"
var distincts = source.DistinctUntilChanged();
// But we also want to "look into the future", and see the *next* distinct chain
var futureDistincts = source.DistinctUntilChanged().Skip(1);
// A "delta" occurs when a distinct chain ends, so we'll zip the two
// sequences together (since they are "now distinct" and "now + 1", this will mean changes)
var onlyDeltas = distincts
.Zip(futureDistincts, (before,after) => Tuple.Create(before,after));
using(onlyDeltas.Subscribe(Console.WriteLine))
{
subject.OnNext("Foo");
subject.OnNext("Foo");
subject.OnNext("Foo");
subject.OnNext("Bar"); // BAM: triggers an output value of (Foo, Bar)
subject.OnNext("Bar");
subject.OnNext("Foo"); // BAM: triggers an output value of (Bar, Foo)
}