反应式扩展:修改处理缓慢的现有流



我正在学习Rx,我正在尝试将以下问题转换为Rx管道。看起来应该有一个简单的 Rx 解决方案,但我找不到它。下面是一些简单的 C# 代码来演示该问题:

using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using Item = System.Collections.Generic.KeyValuePair<int, string>;
namespace Sample
{
class Test
{
readonly object _sync = new object();
readonly List<Item> _workList = new List<Item>();
public void Update(IEnumerable<Item> items)
{
lock(_sync)
{
foreach (var item in items)
{
bool found = false;
for (int i = 0; i < _workList.Count; ++i)
{
if (_workList[i].Key == item.Key)
{
_workList[i] = item;
found = true;
break;
}
}
if (!found)
{
_workList.Add(item);
}
}
}
}
public void Run()
{
void ThreadMethod(object _)
{
while (true)
{
Item? item = null;
lock (_sync)
{
if (_workList.Any())
{
item = _workList[0];
_workList.RemoveAt(0);
}
}
if (item.HasValue)
{
var str = $"{item.Value.Key} : {item.Value.Value}";
Console.WriteLine($"Start {str}");
Thread.Sleep(5000); // simluate work
Console.WriteLine($"End {str}");
}
}
}
var thread = new Thread(ThreadMethod);
thread.Start();
}
}
}

"更新"事件由键/值对列表组成。更新将与具有以下规则的现有列表合并。不保证每个已知密钥都会出现在每次更新中

  • 如果找到该键,则会在列表中的当前位置替换该值。以前的值将被丢弃,并且不会被处理。
  • 如果未找到键,则该项将添加到列表末尾

单独的线程一次处理一个项目的列表。此处理需要一些时间(由 Thread.sleep 模拟(。处理项目时,将从列表的开头删除项目。

如您所见,在处理单个项目期间,积压工作中的项目可能会就地发生突变。关键是,只会为每个键处理收到的最新值,但积压工作中键的顺序不能更改(除非处理键时会将其从列表中删除(。如果在列表中重新引入键,则会将其添加到末尾(。

我对 Rx 的最新尝试是将更新输入到 Scan 函数中,该函数将以前未知的键转换为主题,然后在合并所有最新值之前将每个键的新值输入到其相应的主题中,但它并没有完全起作用。

请不要讨论非 Rx 解决方案。上面的简单代码可以完成这项工作,但我想了解是否有 Rx 解决方案。

我正在使用C#(System.Reactive(,但我很乐意接受Rx其他方言的解决方案。

您需要两种机制来实现您的目标。第一个是地图,它为您提供发出项目的最新值。二是flatMap()算子。

Map<String, String> currentSourceValue = new HashMap<>();

我正在使用String作为数据类型以及keyOf()valOf()方法。

此方法将使用最新值更新地图。如果已经存在当前值,请替换它并返回可观察empty()

synchronized Observable<String> setLatestValue( String s ) {
String r = currentSourceValue.put( keyOf( s ), valOf( s ) );
return r == null ? Observable.just( s ) : Observable.empty();
}

如果可以发出,此方法将从映射中提取值。

synchronized Observable<String> getLatestValue( String s ) {
String r = currentSourceValue.remove( keyOf( s ) );
return r == null ? Observable.empty() : Observable.just( r );
}

这将允许发出最新值

source
.flatMap( s -> setLatestValue( s ) )
.observeOn( processingScheduler )
.flatMap( s -> getLatestValue( s ), 1 )
.subscribe( s -> process( s ) );

第一个flatMap()运算符更新传入流的最新值。如果此键的队列中已有项,则返回可观察empty(),以便下游链中不占用任何空间。

第二个flatMap()运算符在处理线程上工作。要flatMap()的第二个参数表示应一次处理一个项目,没有并行性。如果地图中存在值,它将发出一个值,如果不存在,它将不发出任何值,并清除地图条目。理论上,第二个flatMap()可以只发出一个值,但是当观察者链从一个线程跳到另一个上游时,存在一些非确定性。

synchronized关键字指示地图上的操作是原子的,并防止从下游地图中删除值,就像将其添加到上游地图一样。

此解决方案的工作方式类似于groupBy()运算符,但它处理您只想处理给定键的最新值的情况。

这将起作用,尽管我不是它的最大粉丝。

我认为这是一种生产者/消费者的情况:一个线程在创造工作,另一个线程在做。producer主题表示添加工作的线程。其他一切都代表了事物的消费者方面。如果你要把它class起来,producer会去一个班级,其他一切都在另一个班级。

completedKeys保存已完成的键,因此弹出该键的状态:具有该键的新项目将转到行的后面。readyGate表示消费者何时可以处理下一件事。将其与最新的工作相结合是棘手的部分。WithLatestFrom工作得很好,直到你得到一个空列表。.Where().FirstAsync()很好地完成了等待部分。

所有这一切的关键是GroupByUntil:将事物分组,它们自然而然地落入键最初添加的顺序,这就是您想要的。Until子句意味着我们可以关闭可观察量,这将使具有旧键的新项目位于行的后面。DynamicCombinedLatest将所有这些可观察量转换为列表,这实际上是您的状态。

无论如何,你去吧:

var producer = new Subject<Item>();
var readyGate = new Subject<Unit>();
var completedKeys = new Subject<int>();
var Process = new Action<Item>(kvp =>
{
var str = $"{kvp.Key} : {kvp.Value}";
Console.WriteLine($"Start {str}");
Thread.Sleep(500); // simluate work
Console.WriteLine($"End {str}");
});
var groups = producer
.GroupByUntil(kvp => kvp.Key, kvp => kvp, go => completedKeys.Where(k => k == go.Key))
.DynamicCombineLatest();
var q = groups.Publish(_groups => readyGate
.ObserveOn(NewThreadScheduler.Default)
.WithLatestFrom(groups, (_, l) => l)
.SelectMany(l => l.Count == 0
? _groups.Where(g => g.Count > 0).FirstAsync()
: Observable.Return(l)
)
)
.Subscribe(l =>
{
var kvp = l[0];
completedKeys.OnNext(kvp.Key);
Process(kvp);
readyGate.OnNext(Unit.Default);
});

//Runner code:
producer.OnNext(new Item(1, "1-a"));
producer.OnNext(new Item(1, "1-b"));
producer.OnNext(new Item(2, "2-a"));
producer.OnNext(new Item(2, "2-b"));
readyGate.OnNext(Unit.Default);
await Task.Delay(TimeSpan.FromMilliseconds(100)); //to test if 1 gets done again and goes to the back of the line.
producer.OnNext(new Item(1, "1-c"));

这是DynamicCombinedLatest(使用 nuget 包System.Collections.Immutable(:

public static IObservable<List<T>> DynamicCombineLatest<T>(this IObservable<IObservable<T>> source)
{
return source
.SelectMany((o, i) => o.Materialize().Select(notification => (observableIndex: i, notification: notification)))
.Scan((exception: (Exception)null, dict: ImmutableDictionary<int, T>.Empty), (state, t) => t.notification.Kind == NotificationKind.OnNext
? ((Exception)null, state.dict.SetItem(t.observableIndex, t.notification.Value))
: t.notification.Kind == NotificationKind.OnCompleted
? ((Exception)null, state.dict.Remove(t.observableIndex))
: (t.notification.Exception, state.dict)
)
.Select(t => t.exception == null
? Notification.CreateOnNext(t.dict)
: Notification.CreateOnError<ImmutableDictionary<int, T>>(t.exception)
)
.Dematerialize()
.Select(dict => dict.OrderBy(kvp => kvp.Key).Select(kvp => kvp.Value).ToList());
}

最新更新