我有一个可观察的元素序列,这些元素具有char Key
属性,其值范围从'A'
到'E'
.我想根据这个键对这些元素进行分组。对它们进行分组后,我希望结果由可观察的组组成,以便我可以单独处理每个组。我的问题是我找不到一种很好的方法来保留最终可观察量中每个组的键。以下是我正在尝试执行的操作的示例:
var observable = Observable
.Interval(TimeSpan.FromMilliseconds(100))
.Take(42)
.GroupBy(n => (char)(65 + n % 5))
.Select(grouped => grouped.ToArray())
.Merge();
observable.Subscribe(group =>
Console.WriteLine($"Group: {String.Join(", ", group)}"));
输出:
Group: 0, 5, 10, 15, 20, 25, 30, 35, 40
Group: 1, 6, 11, 16, 21, 26, 31, 36, 41
Group: 2, 7, 12, 17, 22, 27, 32, 37
Group: 3, 8, 13, 18, 23, 28, 33, 38
Group: 4, 9, 14, 19, 24, 29, 34, 39
组是正确的,但键('A'
-'E'
)丢失。observable
的类型是IObservable<long[]>
.我想要什么 相反,是一个IObservable<IGrouping<char, long>>
。这样,group.Key
将在最终订阅代码中可用。但据我所知 没有内置方法可以将IGroupedObservable
(GroupBy
运算符的结果)转换为IGrouping
。我可以看到运算符ToArray
,ToList
、ToLookup
、ToDictionary
等,但不是ToGrouping
运算符。我的问题是,我如何实现这个运算符?
这是我实现它的不完整尝试:
public static IObservable<IGrouping<TKey, TSource>> ToGrouping<TKey, TSource>(
this IGroupedObservable<TKey, TSource> source)
{
return Observable.Create<IGrouping<TKey, TSource>>(observer =>
{
// What to do?
return source.Subscribe();
});
}
我的目的是在原始示例中使用它而不是ToArray
,如下所示:
.Select(grouped => grouped.ToGrouping())
这完成了您想要的大部分操作:
var observable = Observable
.Interval(TimeSpan.FromMilliseconds(100))
.Take(42)
.GroupBy(n => (char)(65 + n % 5))
.SelectMany(grouped => grouped.ToArray().Select(a => (key: grouped.Key, results: a)));
那是IObservable<ValueTuple<TKey, TResult[]>
.如果你想要IGrouping
接口,你必须创建一个对象,因为我认为没有一个适合你的对象:
public static class Grouping
{
// Because I'm too lazy to code types
public static Grouping<TKey, TResult> Create<TKey, TResult>(TKey key, IEnumerable<TResult> results)
{
return new Grouping<TKey, TResult>(key, results);
}
}
public class Grouping<TKey, TResult> : IGrouping<TKey, TResult>
{
public Grouping(TKey key, IEnumerable<TResult> results)
{
this.Key = key;
this.Results = results;
}
public TKey Key { get; }
public IEnumerable<TResult> Results { get; }
public IEnumerator<TResult> GetEnumerator()
{
return Results.GetEnumerator();
}
IEnumerator IEnumerable.GetEnumerator()
{
return Results.GetEnumerator();
}
}
那么你的可观察量就变成了:
var o2 = Observable.Interval(TimeSpan.FromMilliseconds(100))
.Take(42)
.GroupBy(n => (char)(65 + n % 5))
.SelectMany(grouped => grouped.ToArray().Select(a => Grouping.Create(grouped.Key, a)));
这似乎是你想要的:
IObservable<(char Key, long[] Values)> observable =
Observable
.Interval(TimeSpan.FromMilliseconds(100))
.Take(42)
.GroupBy(n => (char)(65 + n % 5))
.Select(grouped => new { Key = grouped.Key, Values = grouped.ToArray() })
.SelectMany(x => x.Values, (k, v) => (Key: k.Key, Values: v));
observable.Subscribe(group =>
Console.WriteLine($"Group {group.Key}: {String.Join(", ", group.Values)}"));
我得到:
Group A: 0, 5, 10, 15, 20, 25, 30, 35, 40
Group B: 1, 6, 11, 16, 21, 26, 31, 36, 41
Group C: 2, 7, 12, 17, 22, 27, 32, 37
Group D: 3, 8, 13, 18, 23, 28, 33, 38
Group E: 4, 9, 14, 19, 24, 29, 34, 39
我找到了一种实现ToGrouping
运算符的方法,而无需创建实现IGrouping
接口的自定义类。它比Shlomo的解决方案更简洁,但效率更低。
/// <summary>
/// Creates an observable sequence containing a single 'IGrouping' that has the same
/// key with the source 'IGroupedObservable', and contains all of its elements.
/// </summary>
public static IObservable<IGrouping<TKey, TSource>> ToGrouping<TKey, TSource>(
this IGroupedObservable<TKey, TSource> source)
{
return source
.ToList()
.Select(list => list.GroupBy(_ => source.Key).Single());
}
此实现假定TKey
类型不会以某种疯狂的方式实现IEquatable
接口,它为同一值返回不同的哈希代码,或者认为值不等于自身。如果发生这种情况,Single
LINQ 运算符将引发异常。