<T> 使用反应式扩展创建操作的通用调度程序



我正在尝试很多响应式扩展,现在我正在尝试做一个系统,在这个系统中,我可以将过程排队,并以任何我想要的方式执行它们,同时能够向订阅者发送通知。

我目前将我的数据库访问封装在UserAccess类中,该类公开了添加用户的方法。在该方法中,我希望将一个将用户添加到数据库的操作排队。因此,我创建了一个JobProcessor of T类,它公开了一个方法QueueJob(Action),并让我的User实现这个类。我的问题是我看不出如何从可观察对象的OnNext方法中调用Action,因为该Action需要User参数。

我的攻角一定是错的,我对设计的把握一定有问题。例如,我知道我应该以某种方式将我的用户传递给QueueJob过程,但我不知道如何以一种干净的方式做到这一点。

    public class UserAccess : JobProcessor<User>
    {
        public void AddUser(User user)
        {
            QueueJob(usr =>
                     {
                         using (var db = new CenterPlaceModelContainer())
                         {
                             db.Users.Add(usr);
                         }
                     });
         [...]
    public abstract class JobProcessor<T>
    {
        // Either Subject<T> or Subject<Action<T>>
        private Subject<Action<T>> JobSubject = new Subject<Action<T>>();
        public JobProcessor()
        {
            JobSubject
            /* Insert Rx Operators Here */
            .Subscribe(OnJobNext, OnJobError, OnJobComplete);
        }
        private void OnJobNext(Action<T> action)
        {
            // ???
        }
        private void OnJobError(Exception exception)
        {
        }
        private void OnJobComplete()
        {
        }
        public void QueueJob(Action<T> action)
        {
            JobSubject.OnNext(action);
        }
    }
编辑1:

我试图将QueueJob的签名更改为

QueueJob(T entity, Action<T> action)

现在我可以写

QueueJob(user, usr => { ... } );

但这似乎不是很直观。我还没见过很多同时传递实体和动作的框架。这样我就不需要JobProcessor了。

Edit 2:我将JobProcessor的主题类型更改为subject,完全删除了T。因为不需要在过程中包含User,因为我可以从外部引用它。现在唯一的问题是,如果传递给QueueJob的User I的操作在action执行的实际时间之间发生了变化,则用户将拥有修改后的信息。不喜欢,但我想我会继续寻找解决方案。

我的代码现在是(使用Buffer作为示例):

public abstract class JobProcessor
{
   public Subject<Action> JobSubject = new Subject<Action>();
   public JobProcessor()
   {
       JobSubject
           .Buffer(3)
           .Subscribe(OnJobNext, OnJobError, OnJobComplete);
   }
   private void OnJobNext(IList<Action> actionsList)
   {
       foreach (var element in actionsList)
        {
            element();
        }
   }
   private void OnJobError(Exception exception)
   {
   }
   private void OnJobComplete()
   {
   }
   public void QueueJob(Action action)
   {
       JobSubject.OnNext(action);
   }
}

首先,我必须同意Lee和NSGaga的观点,你可能不希望这样做——还有其他的生产者/消费者队列模式,它们更符合(我认为)你在这里想要完成的任务。

也就是说,既然我无法抗拒挑战…通过一些微小的调整,您可以通过捕获传入的用户参数并使其成为直接的Action来消除"我将什么传递到操作中?"的直接问题-以下是经过一些修改的代码:

public class UserAccess : JobProcessor
{
    public void AddUser(User user)
    {
        QueueJob(() =>
                 {
                     using (var db = new CenterPlaceModelContainer())
                     {
                         db.Users.Add(user);
                     }
                 });
     [...]
public abstract class JobProcessor
{
    // Subject<Action>
    private Subject<Action> JobSubject = new Subject<Action>();
    public JobProcessor()
    {
        JobSubject
        /* Insert Rx Operators Here */
        .Subscribe(OnJobNext, OnJobError, OnJobComplete);
    }
    private void OnJobNext(Action action)
    {
        // Log something saying "Yo, I'm executing an action" here?
        action();
    }
    private void OnJobError(Exception exception)
    {
        // Log something saying "Yo, something broke" here?
    }
    private void OnJobComplete()
    {
        // Log something saying "Yo, we shut down" here?
    }
    public void QueueJob(Action action)
    {
        JobSubject.OnNext(action);
    }
}

我不太确定你的"目标"是什么——但我认为你有点搞反了……

通常情况下,对象会通过
等属性暴露IObservable<Action<T>> NewJob {get{return _subject;}}
…什么的。(主体变得可观察-主体在本质上是双重的-以及为什么它是特定的-有点争议-但很适合玩耍等)

从类内部调用OnNext -就像你做的那样。

但是你自己通常不会订阅可观察对象
…您可以让外部用户通过"挂钩"到您的属性并定义订阅来实现这一点,从而在他们到达时获得新项目。

这当然是简化了的,有很多情况和用途,但这可能会有所帮助,我希望

我的第一反应是IObservable通常最适合创建不可变数据结构序列,而不是方法指针/委托/操作。

接下来我建议,如果你正在尝试以队列方式"调度"操作,那么Rx中的isscheduler实现似乎是一个完美的选择!

或者,如果你实际上试图创建一个ProduceConsumer队列,那么我不认为Rx实际上是最适合的。例如,如果你将一堆消息放入队列中,然后让一些消费者读取这些消息并处理它们,我会考虑使用不同的框架。

我最终确定了我的设计,并找到了我喜欢的东西。这里是代码,如果有人需要它。

public class JobProcessor<T> : IDisposable where T : new()
{
    private ISubject<Action<T>> jobsProcessor = new Subject<Action<T>>();
    private IDisposable disposer;
    private T _jobProvider = new T();
    public JobProcessor(Func<ISubject<Action<T>>, IObservable<IEnumerable<Action<T>>>> initializer)
    {
        Console.WriteLine("Entering JobProcessor Constructor");
        disposer = initializer(jobsProcessor)
            .Subscribe(OnJobsNext, OnJobsError, OnJobsComplete);
        Console.WriteLine("Leaving JobProcessor Constructor");
    }
    private void OnJobsNext(IEnumerable<Action<T>> actions)
    {
        Debug.WriteLine("Entering OnJobsNext");
        foreach (var action in actions)
        {
            action(_jobProvider);
        }
        Debug.WriteLine("Leaving OnJobsNext");
    }
    private void OnJobsError(Exception ex)
    {
        Debug.WriteLine("Entering OnJobsError");
        Debug.WriteLine(ex.Message);
        Debug.WriteLine("Leaving OnJobsError");
    }
    private void OnJobsComplete()
    {
        Debug.WriteLine("Entering OnJobsComplete");
        Debug.WriteLine("Leaving OnJobsComplete");
    }
    public void QueueJob(Action<T> action)
    {
        Debug.WriteLine("Entering QueueJobs");
        jobsProcessor.OnNext(action);
        Debug.WriteLine("Leaving QueueJobs");
    }
    public void Dispose()
    {
        disposer.Dispose();
    }
}

我选择了一个通用的make来支持一个分层的体系结构,我可以在并发层中使用JobProcessor,在并发层中我可以选择执行的快慢。JobProcessor构造函数接受一个函数,该函数用于在代码的其他地方声明Observable序列,并生成一个处理器,该处理器按照序列所描述的顺序执行作业。OnNext接受一个IEnumerable>,以便能够支持像. buffer(3)这样的序列,它同时返回一批动作。这样做的缺点是,当创建一个序列时,每次返回单个动作,我需要这样做

var x = new JobProcessor<DatabaseAccess<User>>(subject => subject.Select(action => action.Yield()));

T的Yield()扩展方法返回单个元素的可枚举对象。我发现它在这里传递一个单独的项目作为IEnumerable.

相关内容

  • 没有找到相关文章

最新更新