如何将传入的任务推送到切换的Observable中



好的,所以我目前正在尝试创建一个文件写入队列。每个文件一次只能写入一个。我有一个控制器,它接受要写入的文件名和数据字节数组,并在写入完成时返回一个Task。

问题是我不需要写得太多。我希望对给定文件的每个请求都被放入一个"bucket"中,只写最近的请求,忽略其他请求。我会称之为"队列",但能够从末尾删除项目与队列的定义背道而驰(至少在我看来)。

有没有一种简单的方法可以用Observable做到这一点?让我试着把台阶原子化。

  • 接受字符串和字节数组
  • 如果字符串中有正在处理的项,请将Task放在"deck"上
  • 如果字符串有一个项"在甲板上",则使用新数据更新该项

我的第二步和第三步有问题。我尝试了一个简单的按文件名切换,但我遇到了一个问题,即它不等待已经运行的文件完成后再启动。它也会立即返回,而不是等待新项目完成。

如果您能提供任何帮助,我们将不胜感激!

我提出了这个实现,它允许对不同文件进行并发更新,并利用Rx调度器实现简单快速的可测试性。UpdateFile例程只会休眠5个虚拟秒,但您可以针对实际情况进行更改。

我返回的不是普通的Task,而是Task<string>,其中的结果是更新的文件名。我还为更新负载使用了一个通用类型TData来支持测试。

我确信在效率上还有改进的空间,但这似乎是一个正确的、相当有效的实现:

public class FileUpdateController<TData>
{
    private readonly object _gate = new object();
    private readonly Dictionary<string, TaskCompletionSource<string>> _pendingTasks;
    private readonly Dictionary<string, TaskCompletionSource<string>> _runningTasks;
    private readonly Dictionary<string, TData> _dataCache;
    private readonly IScheduler _scheduler;
    public FileUpdateController(IScheduler scheduler)
    {
        if (scheduler == null) scheduler = ThreadPoolScheduler.Instance;
        _scheduler = scheduler;
        _pendingTasks = new Dictionary<string, TaskCompletionSource<string>>();
        _runningTasks = new Dictionary<string, TaskCompletionSource<string>>();
        _dataCache = new Dictionary<string, TData>();
    }
    public Task<string> QueueFileUpdate(string filename, TData data)
    {
        lock (_gate)
        {
            if (!_pendingTasks.ContainsKey(filename))
            {
                var tcs = new TaskCompletionSource<string>(filename);
                _pendingTasks.Add(filename, tcs);
            }
            var task = _pendingTasks[filename].Task;
            if (!_runningTasks.ContainsKey(filename))
            {
                MoveToRunning(filename, data);                    
            }
            else
            {
                _dataCache[filename] = data;
            }
            return task;
        }            
    }
    private void MoveToRunning(string filename, TData data)
    {
        _runningTasks.Add(filename, _pendingTasks[filename]);
        _pendingTasks.Remove(filename);
        _scheduler.Schedule(() => UpdateFile(filename, data));
    }
    private async void UpdateFile(string filename, TData data)
    {
        Console.WriteLine("Updating file " + filename + " with data " + data);
        await Observable.Timer(TimeSpan.FromSeconds(5), _scheduler).ToTask();
        lock (_gate)
        {
            var tcs = _runningTasks[filename];
            _runningTasks.Remove(filename);
            if (_pendingTasks.ContainsKey(filename))
            {
                var cachedData = _dataCache[filename];
                _dataCache.Remove(filename);
                MoveToRunning(filename, cachedData);                    
            }
            tcs.SetResult(filename);
            Console.WriteLine("Updated file " + filename + " with data " + data);
        }            
    }
}

下面是我的一些测试(使用nuget包rx-testing和nunit)。他们确实缺乏测试是否更新了正确数据的能力,但你可以期待测试控制台的输出:

public class FileUpdateControllerTests
{
    [Test]
    public void SingleUpdate()
    {
        var scheduler = new TestScheduler();
        var sut = new FileUpdateController<string>(scheduler);
        var task = sut.QueueFileUpdate("test", "1");
        Assert.AreNotEqual(TaskStatus.RanToCompletion, task.Status);
        scheduler.Start();
        Assert.AreEqual(TaskStatus.RanToCompletion, task.Status);
        Assert.AreEqual("test", task.Result);
    }
    [Test]
    public void TwoUpdatesToSameFile()
    {
        var scheduler = new TestScheduler();
        var sut = new FileUpdateController<string>(scheduler);
        var task1 = sut.QueueFileUpdate("test", "1");
        var task2 = sut.QueueFileUpdate("test", "2");
        Assert.AreNotEqual(TaskStatus.RanToCompletion, task1.Status);
        scheduler.AdvanceBy(TimeSpan.FromSeconds(5).Ticks + 1);
        Assert.AreNotEqual(TaskStatus.RanToCompletion, task2.Status);
        Assert.AreEqual(TaskStatus.RanToCompletion, task1.Status);
        scheduler.AdvanceBy(TimeSpan.FromSeconds(5).Ticks + 2);
        Assert.AreEqual(TaskStatus.RanToCompletion, task2.Status);
        Assert.AreEqual("test", task1.Result);
        Assert.AreEqual("test", task2.Result);
    }
    [Test]
    public void TwoUpdatesToDifferentFiles()
    {
        var scheduler = new TestScheduler();
        var sut = new FileUpdateController<string>(scheduler);
        var task1 = sut.QueueFileUpdate("test1", "1");
        var task2 = sut.QueueFileUpdate("test2", "2");
        Assert.AreNotEqual(TaskStatus.RanToCompletion, task1.Status);
        scheduler.AdvanceBy(TimeSpan.FromSeconds(5).Ticks + 1);
        Assert.AreEqual(TaskStatus.RanToCompletion, task1.Status);
        Assert.AreEqual(TaskStatus.RanToCompletion, task2.Status);
        Assert.AreEqual("test1", task1.Result);
        Assert.AreEqual("test2", task2.Result);
    }
    [Test]
    public void UpdatingDataOnPendingFileWorks()
    {
        var scheduler = new TestScheduler();
        var sut = new FileUpdateController<string>(scheduler);
        var task1 = sut.QueueFileUpdate("test", "1");
        var task2 = sut.QueueFileUpdate("test", "2");
        var task3 = sut.QueueFileUpdate("test", "3");
        Assert.AreSame(task2, task3);
        scheduler.AdvanceBy(TimeSpan.FromSeconds(5).Ticks + 1);
        Assert.AreEqual(TaskStatus.RanToCompletion, task1.Status);
        Assert.AreNotEqual(TaskStatus.RanToCompletion, task2.Status);
        scheduler.AdvanceBy(TimeSpan.FromSeconds(5).Ticks + 1);
        Assert.AreEqual(TaskStatus.RanToCompletion, task2.Status);
    }
}

相关内容

  • 没有找到相关文章

最新更新