我正在尝试很多响应式扩展,现在我正在尝试做一个系统,在这个系统中,我可以将过程排队,并以任何我想要的方式执行它们,同时能够向订阅者发送通知。
我目前将我的数据库访问封装在UserAccess类中,该类公开了添加用户的方法。在该方法中,我希望将一个将用户添加到数据库的操作排队。因此,我创建了一个JobProcessor of T类,它公开了一个方法QueueJob(Action),并让我的User实现这个类。我的问题是我看不出如何从可观察对象的OnNext方法中调用Action,因为该Action需要User参数。
我的攻角一定是错的,我对设计的把握一定有问题。例如,我知道我应该以某种方式将我的用户传递给QueueJob过程,但我不知道如何以一种干净的方式做到这一点。
public class UserAccess : JobProcessor<User>
{
public void AddUser(User user)
{
QueueJob(usr =>
{
using (var db = new CenterPlaceModelContainer())
{
db.Users.Add(usr);
}
});
[...]
public abstract class JobProcessor<T>
{
// Either Subject<T> or Subject<Action<T>>
private Subject<Action<T>> JobSubject = new Subject<Action<T>>();
public JobProcessor()
{
JobSubject
/* Insert Rx Operators Here */
.Subscribe(OnJobNext, OnJobError, OnJobComplete);
}
private void OnJobNext(Action<T> action)
{
// ???
}
private void OnJobError(Exception exception)
{
}
private void OnJobComplete()
{
}
public void QueueJob(Action<T> action)
{
JobSubject.OnNext(action);
}
}
编辑1:
我试图将QueueJob的签名更改为
QueueJob(T entity, Action<T> action)
现在我可以写
QueueJob(user, usr => { ... } );
但这似乎不是很直观。我还没见过很多同时传递实体和动作的框架。这样我就不需要JobProcessor了。
Edit 2:我将JobProcessor的主题类型更改为subject,完全删除了T。因为不需要在过程中包含User,因为我可以从外部引用它。现在唯一的问题是,如果传递给QueueJob的User I的操作在action执行的实际时间之间发生了变化,则用户将拥有修改后的信息。不喜欢,但我想我会继续寻找解决方案。
我的代码现在是(使用Buffer作为示例):
public abstract class JobProcessor
{
public Subject<Action> JobSubject = new Subject<Action>();
public JobProcessor()
{
JobSubject
.Buffer(3)
.Subscribe(OnJobNext, OnJobError, OnJobComplete);
}
private void OnJobNext(IList<Action> actionsList)
{
foreach (var element in actionsList)
{
element();
}
}
private void OnJobError(Exception exception)
{
}
private void OnJobComplete()
{
}
public void QueueJob(Action action)
{
JobSubject.OnNext(action);
}
}
首先,我必须同意Lee和NSGaga的观点,你可能不希望这样做——还有其他的生产者/消费者队列模式,它们更符合(我认为)你在这里想要完成的任务。
也就是说,既然我无法抗拒挑战…通过一些微小的调整,您可以通过捕获传入的用户参数并使其成为直接的Action
来消除"我将什么传递到操作中?"的直接问题-以下是经过一些修改的代码:
public class UserAccess : JobProcessor
{
public void AddUser(User user)
{
QueueJob(() =>
{
using (var db = new CenterPlaceModelContainer())
{
db.Users.Add(user);
}
});
[...]
public abstract class JobProcessor
{
// Subject<Action>
private Subject<Action> JobSubject = new Subject<Action>();
public JobProcessor()
{
JobSubject
/* Insert Rx Operators Here */
.Subscribe(OnJobNext, OnJobError, OnJobComplete);
}
private void OnJobNext(Action action)
{
// Log something saying "Yo, I'm executing an action" here?
action();
}
private void OnJobError(Exception exception)
{
// Log something saying "Yo, something broke" here?
}
private void OnJobComplete()
{
// Log something saying "Yo, we shut down" here?
}
public void QueueJob(Action action)
{
JobSubject.OnNext(action);
}
}
我不太确定你的"目标"是什么——但我认为你有点搞反了……
通常情况下,对象会通过
等属性暴露IObservable<Action<T>> NewJob {get{return _subject;}}
…什么的。(主体变得可观察-主体在本质上是双重的-以及为什么它是特定的-有点争议-但很适合玩耍等)
从类内部调用OnNext
-就像你做的那样。
但是你自己通常不会订阅可观察对象
…您可以让外部用户通过"挂钩"到您的属性并定义订阅来实现这一点,从而在他们到达时获得新项目。
这当然是简化了的,有很多情况和用途,但这可能会有所帮助,我希望
我的第一反应是IObservable通常最适合创建不可变数据结构序列,而不是方法指针/委托/操作。
接下来我建议,如果你正在尝试以队列方式"调度"操作,那么Rx中的isscheduler实现似乎是一个完美的选择!
或者,如果你实际上试图创建一个ProduceConsumer队列,那么我不认为Rx实际上是最适合的。例如,如果你将一堆消息放入队列中,然后让一些消费者读取这些消息并处理它们,我会考虑使用不同的框架。
我最终确定了我的设计,并找到了我喜欢的东西。这里是代码,如果有人需要它。
public class JobProcessor<T> : IDisposable where T : new()
{
private ISubject<Action<T>> jobsProcessor = new Subject<Action<T>>();
private IDisposable disposer;
private T _jobProvider = new T();
public JobProcessor(Func<ISubject<Action<T>>, IObservable<IEnumerable<Action<T>>>> initializer)
{
Console.WriteLine("Entering JobProcessor Constructor");
disposer = initializer(jobsProcessor)
.Subscribe(OnJobsNext, OnJobsError, OnJobsComplete);
Console.WriteLine("Leaving JobProcessor Constructor");
}
private void OnJobsNext(IEnumerable<Action<T>> actions)
{
Debug.WriteLine("Entering OnJobsNext");
foreach (var action in actions)
{
action(_jobProvider);
}
Debug.WriteLine("Leaving OnJobsNext");
}
private void OnJobsError(Exception ex)
{
Debug.WriteLine("Entering OnJobsError");
Debug.WriteLine(ex.Message);
Debug.WriteLine("Leaving OnJobsError");
}
private void OnJobsComplete()
{
Debug.WriteLine("Entering OnJobsComplete");
Debug.WriteLine("Leaving OnJobsComplete");
}
public void QueueJob(Action<T> action)
{
Debug.WriteLine("Entering QueueJobs");
jobsProcessor.OnNext(action);
Debug.WriteLine("Leaving QueueJobs");
}
public void Dispose()
{
disposer.Dispose();
}
}
我选择了一个通用的make来支持一个分层的体系结构,我可以在并发层中使用JobProcessor,在并发层中我可以选择执行的快慢。JobProcessor构造函数接受一个函数,该函数用于在代码的其他地方声明Observable序列,并生成一个处理器,该处理器按照序列所描述的顺序执行作业。OnNext接受一个IEnumerable>,以便能够支持像. buffer(3)这样的序列,它同时返回一批动作。这样做的缺点是,当创建一个序列时,每次返回单个动作,我需要这样做
var x = new JobProcessor<DatabaseAccess<User>>(subject => subject.Select(action => action.Yield()));
T的Yield()扩展方法返回单个元素的可枚举对象。我发现它在这里传递一个单独的项目作为IEnumerable