按反应式扩展中的预定义顺序对可观察的排序



假设我有一个类型T

class T {
public int identifier; //Arbitrary but unique for each character (Guids in real-life)
public char character; //In real life not a char, but I chose char here for easy demo purposes
}

我有一个预定义的有序标识符序列:

int[] identifierSequence = new int[]{
9, 3, 4, 4, 7
};

我现在需要订购一个生成以下对象序列的IObservable<T>

{identifier: 3, character 'e'},
{identifier: 9, character 'h'},
{identifier: 4, character 'l'},
{identifier: 4, character 'l'},
{identifier: 7, character 'o'}

以便生成的 IObservable 产生hello。 我不想使用 ToArray,因为我想在对象到达后立即接收对象,而不是等到观察到所有内容。 更具体地说,我希望像这样接收它们:

Input: e  h  l  l  o
Output:    he l  l  o

正确的反应方式是什么? 我能想到的最好的是:

Dictionary<int, T> buffer = new Dictionary<int, T>();
int curIndex = 0;
inputObserable.SelectMany(item =>
{
buffer[item.identifier] = item;
IEnumerable<ReportTemplate> GetReadyElements()
{
while (true)
{
int nextItemIdentifier = identifierSequence[curIndex];
T nextItem;
if (buffer.TryGetValue(nextItemIdentifier, out nextItem))
{
buffer.Remove(nextItem.identifier);
curIndex++;
yield return nextItem;
}
else
{
break;
}
}
}
return GetReadyElements();
});

编辑:

Schlomo 在我的代码中提出了一些非常有效的问题,这就是为什么我将他的答案标记为正确。我对他的代码进行了一些修改,使其可用:

  • 通用标识符和对象类型
  • 迭代而不是递归,以防止在非常大的可观察量上出现潜在的堆栈溢出
  • 将匿名类型转换为实数类以提高可读性
  • 尽可能在字典中仅查找一次值并存储为变量,而不是多次查找
  • 固定式

这给了我以下代码:

public static IObservable<T> OrderByIdentifierSequence<T, TId>(this IObservable<T> source, IList<TId> identifierSequence, Func<T, TId> identifierFunc)
{
var initialState = new OrderByIdentifierSequenceState<T, TId>(0, ImmutableDictionary<TId, ImmutableList<T>>.Empty, Enumerable.Empty<T>());
return source.Scan(initialState, (oldState, item) =>
{
//Function to be called upon receiving new item
//If we can pattern match the first item, then it is moved into Output, and concatted continuously with the next possible item
//Otherwise, if nothing is available yet, just return the input state
OrderByIdentifierSequenceState<T, TId> GetOutput(OrderByIdentifierSequenceState<T, TId> state)
{
int index = state.Index;
ImmutableDictionary<TId, ImmutableList<T>> buffer = state.Buffer;
IList<T> output = new List<T>();
while (index < identifierSequence.Count)
{
TId key = identifierSequence[index];
ImmutableList<T> nextValues;
if (!buffer.TryGetValue(key, out nextValues) || nextValues.IsEmpty)
{
//No values available yet
break;
}
T toOutput = nextValues[nextValues.Count - 1];
output.Add(toOutput);
buffer = buffer.SetItem(key, nextValues.RemoveAt(nextValues.Count - 1));
index++;
}
return new OrderByIdentifierSequenceState<T, TId>(index, buffer, output);
}
//Before calling the recursive function, add the new item to the buffer
TId itemIdentifier = identifierFunc(item);
ImmutableList<T> valuesList;
if (!oldState.Buffer.TryGetValue(itemIdentifier, out valuesList))
{
valuesList = ImmutableList<T>.Empty;
}
var remodifiedBuffer = oldState.Buffer.SetItem(itemIdentifier, valuesList.Add(item));
return GetOutput(new OrderByIdentifierSequenceState<T, TId>(oldState.Index, remodifiedBuffer, Enumerable.Empty<T>()));
})
// Use Dematerialize/Notifications to detect and emit end of stream.
.SelectMany(output =>
{
var notifications = output.Output
.Select(item => Notification.CreateOnNext(item))
.ToList();
if (output.Index == identifierSequence.Count)
{
notifications.Add(Notification.CreateOnCompleted<T>());
}
return notifications;
})
.Dematerialize();
}
class OrderByIdentifierSequenceState<T, TId>
{
//Index shows what T we're waiting on
public int Index { get; }
//Buffer holds T that have arrived that we aren't ready yet for
public ImmutableDictionary<TId, ImmutableList<T>> Buffer { get; }
//Output holds T that can be safely emitted.
public IEnumerable<T> Output { get; }
public OrderByIdentifierSequenceState(int index, ImmutableDictionary<TId, ImmutableList<T>> buffer, IEnumerable<T> output)
{
this.Index = index;
this.Buffer = buffer;
this.Output = output;
}
}

但是,此代码仍然存在一些问题:

  • 不断复制状态(主要是ImmutableDictionary(,这可能非常昂贵。可能的解决方案:维护每个观察者的单独状态,而不是每个收到的项目。
  • identifierSequence中的一个或多个元素在源可观察对象中不存在时,会出现问题。这当前会阻止有序的可观察量,并且永远不会完成。可能的解决方案:超时,在源可观察性完成时抛出异常,在源可观察量完成时返回所有可用项,...
  • 当源可观察对象包含的元素多于identifierSequence时,我们得到内存泄漏。源可观察量中但当前不在identifierSequence中的项目将添加到字典中,但在源可观察量完成之前不会删除。这是潜在的内存泄漏。可能的解决方案:在将项目添加到字典之前检查该项目是否identifierSequence,绕过代码并立即输出该项目,...

我的解决方案:

/// <summary>
/// Takes the items from the source observable, and returns them in the order specified in identifierSequence.
/// If an item is missing from the source observable, the returned obserable returns items up until the missing item and then blocks until the source observable is completed.
/// All available items are then returned in order. Note that this means that while a correct order is guaranteed, there might be missing items in the result observable.
/// If there are items in the source observable that are not in identifierSequence, these items will be ignored.
/// </summary>
/// <typeparam name="T">The type that is produced by the source observable</typeparam>
/// <typeparam name="TId">The type of the identifiers used to uniquely identify a T</typeparam>
/// <param name="source">The source observable</param>
/// <param name="identifierSequence">A list of identifiers that defines the sequence in which the source observable is to be ordered</param>
/// <param name="identifierFunc">A function that takes a T and outputs its unique identifier</param>
/// <returns>An observable with the same elements as the source, but ordered by the sequence of items in identifierSequence</returns>
public static IObservable<T> OrderByIdentifierSequence<T, TId>(this IObservable<T> source, IList<TId> identifierSequence, Func<T, TId> identifierFunc)
{
if (source == null)
{
throw new ArgumentNullException(nameof(source));
}
if (identifierSequence == null)
{
throw new ArgumentNullException(nameof(identifierSequence));
}
if (identifierFunc == null)
{
throw new ArgumentNullException(nameof(identifierFunc));
}
if (identifierSequence.Count == 0)
{
return Observable.Empty<T>();
}
HashSet<TId> identifiersInSequence = new HashSet<TId>(identifierSequence);
return Observable.Create<T>(observer =>
{
//current index of pending item in identifierSequence
int index = 0;
//buffer of items we have received but are not ready for yet
Dictionary<TId, List<T>> buffer = new Dictionary<TId, List<T>>();
return source.Select(
item =>
{
//Function to be called upon receiving new item
//We search for the current pending item in the buffer. If it is available, we yield return it and repeat.
//If it is not available yet, stop.
IEnumerable<T> GetAvailableOutput()
{
while (index < identifierSequence.Count)
{
TId key = identifierSequence[index];
List<T> nextValues;
if (!buffer.TryGetValue(key, out nextValues) || nextValues.Count == 0)
{
//No values available yet
break;
}
yield return nextValues[nextValues.Count - 1];
nextValues.RemoveAt(nextValues.Count - 1);
index++;
}
}
//Get the identifier for this item
TId itemIdentifier = identifierFunc(item);
//If this item is not in identifiersInSequence, we ignore it.
if (!identifiersInSequence.Contains(itemIdentifier))
{
return Enumerable.Empty<T>();
}
//Add the new item to the buffer
List<T> valuesList;
if (!buffer.TryGetValue(itemIdentifier, out valuesList))
{
valuesList = new List<T>();
buffer[itemIdentifier] = valuesList;
}
valuesList.Add(item);
//Return all available items
return GetAvailableOutput();
})
.Subscribe(output =>
{
foreach (T cur in output)
{
observer.OnNext(cur);
}
if (index == identifierSequence.Count)
{
observer.OnCompleted();
}
},(ex) =>
{
observer.OnError(ex);
}, () =>
{
//When source observable is completed, return the remaining available items
while (index < identifierSequence.Count)
{
TId key = identifierSequence[index];
List<T> nextValues;
if (!buffer.TryGetValue(key, out nextValues) || nextValues.Count == 0)
{
//No values available
index++;
continue;
}
observer.OnNext(nextValues[nextValues.Count - 1]);
nextValues.RemoveAt(nextValues.Count - 1);
index++;
}
//Mark observable as completed
observer.OnCompleted();
});
});
}

请注意,您的实现存在一些问题:

  1. 如果两个"l"在他们的时间之前出现,一个被吞噬,然后保持整个序列。字典应映射到集合,而不是单个项目。
  2. 没有OnCompleted消息。
  3. 多个订阅者可能会搞砸状态。试试这个(其中GetPatternMatchOriginal是你的代码(:

-

var stateMachine = src.GetPatternMatchOriginal(new int[] { 9, 3, 4, 4, 7 });
stateMachine.Take(3).Dump(); //Linqpad
stateMachine.Take(3).Dump(); //Linqpad

第一个输出h e l第二个输出l o。它们都应该输出h e l.

此实现解决了这些问题,并且使用不可变的数据结构也没有副作用:

public static class X
{
public static IObservable<T> GetStateMachine(this IObservable<T> source, string identifierSequence)
{
//State is held in an anonymous type: 
//  Index shows what character we're waiting on, 
//  Buffer holds characters that have arrived that we aren't ready yet for
//  Output holds characters that can be safely emitted.
return source
.Scan(new { Index = 0, Buffer = ImmutableDictionary<int, ImmutableList<T>>.Empty, Output = Enumerable.Empty<T>() },
(state, item) =>
{
//Function to be called recursively upon receiving new item
//If we can pattern match the first item, then it is moved into Output, and concatted recursively with the next possible item
//Otherwise just return the inputs
(int Index, ImmutableDictionary<int, ImmutableList<T>> Buffer, IEnumerable<T> Output) GetOutput(int index, ImmutableDictionary<int, ImmutableList<T>> buffer, IEnumerable<T> results)
{
if (index == identifierSequence.Length)
return (index, buffer, results);
var key = identifierSequence[index];
if (buffer.ContainsKey(key) && buffer[key].Any())
{
var toOuptut = buffer[key][buffer[key].Count - 1];
return GetOutput(index + 1, buffer.SetItem(key, buffer[key].RemoveAt(buffer[key].Count - 1)), results.Concat(new[] { toOuptut }));
}
else
return (index, buffer, results);
}
//Before calling the recursive function, add the new item to the buffer
var modifiedBuffer = state.Buffer.ContainsKey(item.Identifier)
? state.Buffer
: state.Buffer.Add(item.Identifier, ImmutableList<T>.Empty);
var remodifiedBuffer = modifiedBuffer.SetItem(item.Identifier, modifiedBuffer[item.Identifier].Add(item));
var output = GetOutput(state.Index, remodifiedBuffer, Enumerable.Empty<T>());
return new { Index = output.Index, Buffer = output.Buffer, Output = output.Output };
})
// Use Dematerialize/Notifications to detect and emit end of stream.
.SelectMany(output =>
{
var notifications = output.Output
.Select(item => Notification.CreateOnNext(item))
.ToList();
if (output.Index == identifierSequence.Length)
notifications.Add(Notification.CreateOnCompleted<T>());
return notifications;
})
.Dematerialize();
}
}

然后你可以这样称呼它:

var stateMachine = src.GetStateMachine(new int[] { 9, 3, 4, 4, 7 });
stateMachine.Dump(); //LinqPad
src.OnNext(new T { Identifier = 4, Character = 'l' });
src.OnNext(new T { Identifier = 4, Character = 'l' });
src.OnNext(new T { Identifier = 7, Character = 'o' });
src.OnNext(new T { Identifier = 3, Character = 'e' });
src.OnNext(new T { Identifier = 9, Character = 'h' });

鉴于你有这个:

IObservable<T> source = new []
{
new T() { identifier = 3, character = 'e' },
new T() { identifier = 9, character = 'h'},
new T() { identifier = 4, character = 'l'},
new T() { identifier = 4, character = 'l'},
new T() { identifier = 7, character = 'o'}
}.ToObservable();
int[] identifierSequence = new int[]
{
9, 3, 4, 4, 7
};

。那么这有效:

IObservable<T> query =
source
.Scan(new { index = 0, pendings = new List<T>(), outputs = new List<T>() }, (a, t) =>
{
var i = a.index;
var o = new List<T>();
a.pendings.Add(t);
var r = a.pendings.Where(x => x.identifier == identifierSequence[i]).FirstOrDefault();
while (r != null)
{
o.Add(r);
a.pendings.Remove(r);
i++;
r = a.pendings.Where(x => x.identifier == identifierSequence[i]).FirstOrDefault();
}
return new { index = i, a.pendings, outputs = o };
})
.SelectMany(x => x.outputs);

好问题:-(

鉴于多个相同的键,对我来说,它看起来像是任意顺序的模式匹配。这是我想出的:

编辑:修改为在字典中查找预期项目。

public static class MyExtensions
{
public static IObservable<TSource> MatchByKeys<TSource, TKey>(this IObservable<TSource> source, IEnumerable<TKey> keys, Func<TSource, TKey> keySelector, IEqualityComparer<TKey> keyComparer = null)
{
if (source == null) throw new ArgumentNullException(nameof(source));
if (keys == null) throw new ArgumentNullException(nameof(keys));
if (keySelector == null) throw new ArgumentNullException(nameof(keySelector));
if (keyComparer == null) keyComparer = EqualityComparer<TKey>.Default;
return Observable.Create<TSource>(observer =>
{
var pattern = new LinkedList<SingleAssignment<TSource>>();
var matchesByKey = new Dictionary<TKey, LinkedList<SingleAssignment<TSource>>>(keyComparer);
foreach (var key in keys)
{
var match = new SingleAssignment<TSource>();
pattern.AddLast(match);
LinkedList<SingleAssignment<TSource>> matches;
if (!matchesByKey.TryGetValue(key, out matches))
{
matches = new LinkedList<SingleAssignment<TSource>>();
matchesByKey.Add(key, matches);
}
matches.AddLast(match);
}
if (pattern.First == null)
{
observer.OnCompleted();
return Disposable.Empty;
}
var sourceSubscription = new SingleAssignmentDisposable();
Action dispose = () =>
{
sourceSubscription.Dispose();
pattern.Clear();
matchesByKey.Clear();
};
sourceSubscription.Disposable = source.Subscribe(
value =>
{
try
{
var key = keySelector(value);
LinkedList<SingleAssignment<TSource>> matches;
if (!matchesByKey.TryGetValue(key, out matches)) return;
matches.First.Value.Value = value;
matches.RemoveFirst();
if (matches.First == null) matchesByKey.Remove(key);
while (pattern.First != null && pattern.First.Value.HasValue)
{
var match = pattern.First.Value;
pattern.RemoveFirst();
observer.OnNext(match.Value);
}
if (pattern.First != null) return;
dispose();
observer.OnCompleted();
}
catch (Exception ex)
{
dispose();
observer.OnError(ex);
}
},
error =>
{
dispose();
observer.OnError(error);
},
() =>
{
dispose();
observer.OnCompleted();
});
return Disposable.Create(dispose);
});
}
private sealed class SingleAssignment<T>
{
public bool HasValue { get; private set; }
private T _value;
public T Value
{
get
{
if (!HasValue) throw new InvalidOperationException("No value has been set.");
return _value;
}
set
{
if (HasValue) throw new InvalidOperationException("Value has alredy been set.");
HasValue = true;
_value = value;
}
}
}
}

测试代码:

var src = new Subject<T>();
var ordered = src.MatchByKeys(new[] { 9, 3, 4, 4, 7 }, t => t.Identifier);
var result = new List<T>();
using (ordered.Subscribe(result.Add))
{
src.OnNext(new T { Identifier = 3, Character = 'e' });
src.OnNext(new T { Identifier = 9, Character = 'h' });
src.OnNext(new T { Identifier = 4, Character = 'l' });
src.OnNext(new T { Identifier = 4, Character = 'l' });
src.OnNext(new T { Identifier = 7, Character = 'o' });
src.OnCompleted();
}
Console.WriteLine(new string(result.Select(t => t.Character).ToArray()));

相关内容

  • 没有找到相关文章

最新更新