>假设我们有一个仅输出的 Stream,它输出字节。来自 Stream 的数据是序列化消息,每条消息始终以字节序列 (0xAA, 0xBB, 0xCC
开头,但消息的长度未知。
目前,我创建了一个可观察量并发出流上的每一个字节,然后订阅这个可观察量,缓冲每个发射,找到字节序列,然后发出缓冲区。类似的东西
List<byte> buffer = new List<byte>();
dataStream.subscribe(b => {
buffer.add(b);
int[] idx = SearchSequence(buffer);
if(idx.Length < 2){
// TODO: wait for more data
}
else{
messageStream.onNext(buffer.GetRange(idx[0], idx[1]));
// TODO: remove them from buffer
}
})
有没有更优雅的方法来解决这个问题?据我所知,有两个问题:
- 消息的长度不是固定的,这使 Observable.buffer() 无效
- 在订阅 dataStream(或从它派生的其他可观察对象)时,Stream 的输出可能在消息的中间。
更新:
如何检测消息的结尾?
消息之间没有间隙,消息彼此相邻。因此,消息的起始序列(0xAA, 0xBB, 0xCC
)也是前一条消息的结束序列
您的输入可观察量是什么样的?
我当前的代码是这样的:
Observer<byte> ob = null;
var dataStream = Observable.Create<byte>(o => ob = o);
while(true){
ob.OnNext(ms.ReadByte());
}
您希望输出可观察的外观是什么样子的?
发出消息的可观察量
Observable<byte[]>
我不知道这有多优雅,但也许它会让你(或其他人)开始。我假设您希望从消息中排除(0xAA, 0xBB, 0xCC)
标头:
var s = new Subject<byte>();
IObservable<byte[]> results = s.Publish(_s => Observable.When(_s
.And(_s.Skip(1))
.And(_s.Skip(2))
.Then((a, b, c) => (a, b, c))
))
.Publish(_t => _t
.Buffer(_t.Where(t => t.a == 0xAA && t.b == 0xBB && t.c == 0xCC))
.Select(l => (l[l.Count - 1].a == 0xAA && l[l.Count - 1].b == 0xBB && l[l.Count - 1].c == 0xCC
? l.Take(l.Count - 3)
: l
)
.Select(e => e.c)
.ToArray()
)
.Skip(1)
)
;
解释:
我们首先使用And/Then/When
来执行双压缩,因此流(0xAA, 0xBB, 0xCC, 0x01, 0x02, 0x03, 0xAA, 0xBB, 0xCC, 0x01, 0x02, 0x03)
变成如下所示的元组流:
(0xAA, 0xBB, 0xCC)
(0xBB, 0xCC, 0x01)
(0xCC, 0x01, 0x02)
(0x01, 0x02, 0x03)
(0x02, 0x03, 0xAA)
(0x03, 0xAA, 0xBB)
(0xAA, 0xBB, 0xCC)
(0xBB, 0xCC, 0x01)
(0xCC, 0x01, 0x02)
(0x01, 0x02, 0x03)
然后,我们使用.Where
来嗅探出一个看起来像(0xAA, 0xBB, 0xCC)
的元组,并将其用作缓冲区边界。
一旦你有了缓冲区边界,它们实际上在消息启动器发生后被切断,所以你最终会得到我们的两个消息的示例流,你最终会得到三个元组列表:
List 1: (0xAA, 0xBB, 0xCC)
List 2: (0xBB, 0xCC, 0x01)
(0xCC, 0x01, 0x02)
(0x01, 0x02, 0x03)
(0x02, 0x03, 0xAA)
(0x03, 0xAA, 0xBB)
(0xAA, 0xBB, 0xCC)
List 3: (0xBB, 0xCC, 0x01)
(0xCC, 0x01, 0x02)
(0x01, 0x02, 0x03)
我们想要在每个列表中显示的字节基本上是第三列,但是如果我们想从消息中排除消息引入,那么我们必须做一些清理:我们必须从列表 2(以及所有其他"中间"列表中排除最后三个元素),我们必须删除列表 1,并且我们必须保留列表 3。删除第一个列表由末尾的.Skip(1)
完成。从中间列表中去除最后三个元素是通过检查列表中的最后一个元素是否(0xAA, 0xBB, 0xCC)
来完成的,如果是,则取除最后三个元素之外的所有元素。
鉴于所有这些,我希望有更好的方法来做到这一点。
下面是一些运行器代码:
results.Dump(); //Linqpad
s.OnNext(0xAA);
s.OnNext(0xBB);
s.OnNext(0xCC);
s.OnNext(0x01);
s.OnNext(0x02);
s.OnNext(0x03);
s.OnNext(0xAA);
s.OnNext(0xBB);
s.OnNext(0xCC);
s.OnNext(0xAA);
s.OnNext(0xBB);
s.OnNext(0xCC);
s.OnNext(0x01);
s.OnNext(0x02);
s.OnNext(0x03);
s.OnNext(0xAA);
s.OnNext(0xBB);
s.OnNext(0xAA);
s.OnNext(0xBB);
s.OnNext(0xCC);
s.OnNext(0xCC);
s.OnNext(0xAA);
s.OnNext(0xBB);
s.OnNext(0x04);
s.OnNext(0x05);
s.OnNext(0x06);
s.OnNext(0x07);
s.OnCompleted();
输出:
01 02 03
01 02 03 AA BB
CC AA BB 04 05 06 07
这是一个自定义Window
运算符,它根据提供的分隔符数组将序列拆分为子序列。它基于接受IObservable<TWindowBoundary> windowBoundaries
参数的内置Window
运算符。
/// <summary>Projects each element of an observable sequence into non-overlapping
/// windows that are separated by the provided separator.</summary>
public static IObservable<IObservable<T>> Window<T>(
this IObservable<T> source,
T[] separator,
IEqualityComparer<T> comparer = null)
{
return Observable.Defer(() =>
{
var boundaries = new Subject<Unit>();
var queue = new Queue<T>(separator.Length);
return source
.Do(x =>
{
if (queue.Count == separator.Length) queue.Dequeue();
queue.Enqueue(x);
if (queue.SequenceEqual(separator, comparer))
{
queue.Clear();
boundaries.OnNext(default);
}
})
.Concat(Observable.Repeat(default(T), separator.Length - 1))
.SkipLast(separator.Length - 1)
.Window(boundaries)
.Select((window, i) => i == 0 ? window : window.Skip(separator.Length));
});
}
Subject
用于通知Window
操作员已检测到新边界。检测机制包括一个保存最后发出的元素的Queue
。每次发出新元素时,都会将此队列与separator
进行比较。Window
操作员通过separator.Length - 1
元素故意落后于检测机制,以便生成的窗口正确对齐。
使用示例:
IObservable<byte> dataStream = GetDataStream();
IObservable<byte[]> messageStream = dataStream
.Window(new byte[] { 0xAA, 0xBB, 0xCC })
.SelectMany(window => window.ToArray());