好的,所以我目前正在尝试创建一个文件写入队列。每个文件一次只能写入一个。我有一个控制器,它接受要写入的文件名和数据字节数组,并在写入完成时返回一个Task。
问题是我不需要写得太多。我希望对给定文件的每个请求都被放入一个"bucket"中,只写最近的请求,忽略其他请求。我会称之为"队列",但能够从末尾删除项目与队列的定义背道而驰(至少在我看来)。
有没有一种简单的方法可以用Observable做到这一点?让我试着把台阶原子化。
- 接受字符串和字节数组
- 如果字符串中有正在处理的项,请将Task放在"deck"上
- 如果字符串有一个项"在甲板上",则使用新数据更新该项
我的第二步和第三步有问题。我尝试了一个简单的按文件名切换,但我遇到了一个问题,即它不等待已经运行的文件完成后再启动。它也会立即返回,而不是等待新项目完成。
如果您能提供任何帮助,我们将不胜感激!
我提出了这个实现,它允许对不同文件进行并发更新,并利用Rx调度器实现简单快速的可测试性。UpdateFile例程只会休眠5个虚拟秒,但您可以针对实际情况进行更改。
我返回的不是普通的Task
,而是Task<string>
,其中的结果是更新的文件名。我还为更新负载使用了一个通用类型TData
来支持测试。
我确信在效率上还有改进的空间,但这似乎是一个正确的、相当有效的实现:
public class FileUpdateController<TData>
{
private readonly object _gate = new object();
private readonly Dictionary<string, TaskCompletionSource<string>> _pendingTasks;
private readonly Dictionary<string, TaskCompletionSource<string>> _runningTasks;
private readonly Dictionary<string, TData> _dataCache;
private readonly IScheduler _scheduler;
public FileUpdateController(IScheduler scheduler)
{
if (scheduler == null) scheduler = ThreadPoolScheduler.Instance;
_scheduler = scheduler;
_pendingTasks = new Dictionary<string, TaskCompletionSource<string>>();
_runningTasks = new Dictionary<string, TaskCompletionSource<string>>();
_dataCache = new Dictionary<string, TData>();
}
public Task<string> QueueFileUpdate(string filename, TData data)
{
lock (_gate)
{
if (!_pendingTasks.ContainsKey(filename))
{
var tcs = new TaskCompletionSource<string>(filename);
_pendingTasks.Add(filename, tcs);
}
var task = _pendingTasks[filename].Task;
if (!_runningTasks.ContainsKey(filename))
{
MoveToRunning(filename, data);
}
else
{
_dataCache[filename] = data;
}
return task;
}
}
private void MoveToRunning(string filename, TData data)
{
_runningTasks.Add(filename, _pendingTasks[filename]);
_pendingTasks.Remove(filename);
_scheduler.Schedule(() => UpdateFile(filename, data));
}
private async void UpdateFile(string filename, TData data)
{
Console.WriteLine("Updating file " + filename + " with data " + data);
await Observable.Timer(TimeSpan.FromSeconds(5), _scheduler).ToTask();
lock (_gate)
{
var tcs = _runningTasks[filename];
_runningTasks.Remove(filename);
if (_pendingTasks.ContainsKey(filename))
{
var cachedData = _dataCache[filename];
_dataCache.Remove(filename);
MoveToRunning(filename, cachedData);
}
tcs.SetResult(filename);
Console.WriteLine("Updated file " + filename + " with data " + data);
}
}
}
下面是我的一些测试(使用nuget包rx-testing和nunit)。他们确实缺乏测试是否更新了正确数据的能力,但你可以期待测试控制台的输出:
public class FileUpdateControllerTests
{
[Test]
public void SingleUpdate()
{
var scheduler = new TestScheduler();
var sut = new FileUpdateController<string>(scheduler);
var task = sut.QueueFileUpdate("test", "1");
Assert.AreNotEqual(TaskStatus.RanToCompletion, task.Status);
scheduler.Start();
Assert.AreEqual(TaskStatus.RanToCompletion, task.Status);
Assert.AreEqual("test", task.Result);
}
[Test]
public void TwoUpdatesToSameFile()
{
var scheduler = new TestScheduler();
var sut = new FileUpdateController<string>(scheduler);
var task1 = sut.QueueFileUpdate("test", "1");
var task2 = sut.QueueFileUpdate("test", "2");
Assert.AreNotEqual(TaskStatus.RanToCompletion, task1.Status);
scheduler.AdvanceBy(TimeSpan.FromSeconds(5).Ticks + 1);
Assert.AreNotEqual(TaskStatus.RanToCompletion, task2.Status);
Assert.AreEqual(TaskStatus.RanToCompletion, task1.Status);
scheduler.AdvanceBy(TimeSpan.FromSeconds(5).Ticks + 2);
Assert.AreEqual(TaskStatus.RanToCompletion, task2.Status);
Assert.AreEqual("test", task1.Result);
Assert.AreEqual("test", task2.Result);
}
[Test]
public void TwoUpdatesToDifferentFiles()
{
var scheduler = new TestScheduler();
var sut = new FileUpdateController<string>(scheduler);
var task1 = sut.QueueFileUpdate("test1", "1");
var task2 = sut.QueueFileUpdate("test2", "2");
Assert.AreNotEqual(TaskStatus.RanToCompletion, task1.Status);
scheduler.AdvanceBy(TimeSpan.FromSeconds(5).Ticks + 1);
Assert.AreEqual(TaskStatus.RanToCompletion, task1.Status);
Assert.AreEqual(TaskStatus.RanToCompletion, task2.Status);
Assert.AreEqual("test1", task1.Result);
Assert.AreEqual("test2", task2.Result);
}
[Test]
public void UpdatingDataOnPendingFileWorks()
{
var scheduler = new TestScheduler();
var sut = new FileUpdateController<string>(scheduler);
var task1 = sut.QueueFileUpdate("test", "1");
var task2 = sut.QueueFileUpdate("test", "2");
var task3 = sut.QueueFileUpdate("test", "3");
Assert.AreSame(task2, task3);
scheduler.AdvanceBy(TimeSpan.FromSeconds(5).Ticks + 1);
Assert.AreEqual(TaskStatus.RanToCompletion, task1.Status);
Assert.AreNotEqual(TaskStatus.RanToCompletion, task2.Status);
scheduler.AdvanceBy(TimeSpan.FromSeconds(5).Ticks + 1);
Assert.AreEqual(TaskStatus.RanToCompletion, task2.Status);
}
}