将昂贵的调用转换为异步,同时保持事件系统的完整性



我在尝试优化一些旧代码时遇到了一些问题。总体情况是这样的:有一个"导出引擎",它根据所需的输出启动一些"编写器"对象。写入旋转DataReader对象并订阅其事件,以便它可以处理正在读取的数据。然后,它在读取器中启动一个长时间运行的"GetData"方法。这将从需要很长时间的旧数据库中检索数据。数据读取器处理返回的值,并激发几个事件,使编写器能够处理数据。

下面包含一个非常简化的DataReader伪代码示例。

class DataReader
{
// delegates
internal delegate void DataRowReadHandler(object sender, DataRowReadArgs e);
internal delegate void DataProgressChangedHandler(object sender, DataProgressChangedArgs e);
internal delegate void DataReadCompleteHandler(object sender, DataReadCompleteArgs e);
// events
internal event DataProgressChangedHandler DataProgressChanged;
internal event DataReadCompleteHandler DataReadCompleted;
internal event DataRowReadHandler DataRowRead;
// this methods chomps on and on and raises an event when the database read returns something
internal void GetData()
{
for (int totalrows = 0; totalrows < _cursor.RowCount; totalrows += _maxrows)
{
// I want to keep GetRawData running while the data it fetched is being processed
string[][] rawdata = _cursor.GetRawData(_maxrows);
// -- a ton of post-processing I want to do while database is being read--
// and then report progress
foreach (row in rawdata)
{
DataRowReadArgs args = new DataRowReadArgs(row.Index)
OnDataRowRead(args); // raise event after each row
}
DataProgressChangedArgs args = new DataProgressChangedArgs(batch, counter);
OnDataProgressChanged(args); // raise event after each batch of rows
}
// report we're done
DataReadCompleteArgs e = new DataReadCompleteArgs(counter);
OnDataReadCompleted(e); // done with reading data
}
protected virtual void OnDataProgressChanged(DataProgressChangedArgs e)
{
DataProgressChangedHandler handler = DataProgressChanged;
if (handler != null)
handler(this, e);
}
protected virtual void OnDataReadCompleted(DataReadCompleteArgs e)
{
DataReadCompleteHandler handler = DataReadCompleted;
if (handler != null)
handler(this, e);
}
protected virtual void OnDataRowRead(DataRowReadArgs e)
{
DataRowReadHandler handler = DataRowReadRead;
if (handler != null)
handler(this, e);
}
}

我想要的是:保持数据库读取(这将是迄今为止最慢的(运行,并在查询结果可用时处理返回的数据。也就是说:在读取器中对数据进行后处理,让它触发事件,并让编写器中的处理程序处理它们,同时继续读取数据库。理想情况下,我还想要一些取消令牌来在出现问题时停止阅读,但首先要做的是。我不想接触很多类所依赖的基于事件的系统,我只希望数据库读取并行运行,并让其他代码在出现结果时做出响应。

我已经接触了wait/async和TaskCompletionSource之类的东西将近一周了,但似乎仍然无法理解这一点。我已经接近了,我实际上设法编译了一个任务列表,并将其提供给一个中间方法,该方法将在每个任务完成时处理它,然后等待它。

internal async Task GetDataAsync()
{
IList<Task<string[][]>> tasks = CreateCursorReadTasks();
var processingTasks = tasks.Select(AwaitAndProcessAsync).ToList();
await Task.WhenAll(processingTasks);
// this isn't 'awaited' in the sense I expected
// also, what order are they performed in? The database is single-threaded, no queues, nothing
// I need to fire my 'done' event only after all tasks have finished
}
private IList<Task<string[][]>> CreateCursorReadTasks()
{
IList<Task<string[][]>> retval = new List<Task<string[][]>>();
for (int totalrows = 0; totalrows < this._cursor.RowCount; totalrows += _maxrows)
{
retval.Add(Task.Run(() => _cursor.GetRawData(_maxrows)));
}
return retval;
}
internal async Task AwaitAndProcessAsync(Task<string[][]> task)
{
string[][] rawdata = await task;
// Do all the post-processing and fire the events like in the GetData method of DataReader
}

除了这一切看起来过于复杂之外,我还遇到了两个问题:a(尽管我订阅了事件处理程序,但它们似乎都为空;b(我不知道在哪里/如何引发已完成的事件。

我的问题是:当你在DataReader类中查看我的GetData方法时,你会建议我如何让非常昂贵的数据库调用运行异步?

让我们利用现代机会:通过BlockingCollection类的管道实现生产者/消费者模式。

GetData方法中,启动两个任务:一个用于获取数据,另一个用于处理数据。

您仍然可以使用您的事件系统。同时将数据添加到集合中,这不会花费很长时间。

在第二个任务中,从集合中提取数据并进行处理。等待GetConsumingEnumerable方法是非常有效的。

class DataReader
{
public CancellationTokenSource CTS { get; } = new CancellationTokenSource();
internal void GetData()
{
// Use the desired data type instead of string
var values = new BlockingCollection<string>();
var readTask = Task.Factory.StartNew(() =>
{
try
{
// here your code
for (...)
{
if (CTS.Token.IsCancellationRequested)
break;
foreach (var row in rawdata)
{
DataRowReadArgs args = new DataRowReadArgs(row.Index);
//...
values.Add(args); // put value to blocking collection
}
}
}
catch (Exception e) { /* process possible exception */}
finally { values.CompleteAdding(); }
}, TaskCreationOptions.LongRunning);
var processTask = Task.Factory.StartNew(() =>
{
foreach (var value in values.GetConsumingEnumerable())
{
if (CTS.Token.IsCancellationRequested)
break;
// process value
}
}, TaskCreationOptions.LongRunning);
Task.WaitAll(readTask, processTask);            
}
}

您可以随时取消任务:

var dataReader = new DataReader();
dataReader.GetData();
dataReader.CTS.Cancel();

您可以使用await Task.WhenAll(readTask, processTask);而不是Task.WaitAll
在这种情况下,方法签名应如下所示:async Task GetDataAsync()

您的伪代码看起来不错,我试图用一个程序验证它,在该程序中,我用Task.Delay(5000)模拟了对数据库的调用,并且在任何时候只允许访问一个任务(以说明数据库是单线程的(。

class Program
{
public static async Task Main(string[] args)
{
var dataReader = new DataReader();
dataReader.DataProgressChanged += (s, e) => Log.D($"*** Event - Processed {e.TaskId}");
dataReader.DataReadCompleted += (s, e) => Log.D("*** Event - Data read complete");
await dataReader.GetDataAsync();
Console.ReadKey();
}
}
public class DataReader
{
internal delegate void DataProgressChangedHandler(object sender, DataProgressChangedArgs e);
internal delegate void DataReadCompleteHandler(object sender, DataReadCompleteArgs e);
internal event DataProgressChangedHandler DataProgressChanged;
internal event DataReadCompleteHandler DataReadCompleted;
private SemaphoreSlim semaphore = new SemaphoreSlim(1);
internal async Task GetDataAsync()
{
Log.D("Start");
var tasks = CreateCursorReadTasks();
var processingTasks = tasks.Select(AwaitAndProcessAsync).ToList();
await Task.WhenAll(processingTasks);
OnDataReadCompleted(new DataReadCompleteArgs());
}
private IList<ReadTaskWrapper> CreateCursorReadTasks()
{
var retval = new List<ReadTaskWrapper>();
for (int totalrows = 0; totalrows < 4; totalrows++)
{
int taskId = totalrows;
retval.Add(new ReadTaskWrapper
{
Task = Task.Run(async () => { return await SimulateDbReadAsync(taskId); }),
Id = taskId
});
}
return retval;
}
private async Task<string[][]> SimulateDbReadAsync(int taskId)
{
await semaphore.WaitAsync();
Log.D($"Starting data read task {taskId}");
await Task.Delay(5000);
Log.D($"Finished data read task {taskId}");
semaphore.Release();
return new string[1][];
}
internal async Task AwaitAndProcessAsync(ReadTaskWrapper task)
{
string[][] rawdata = await task.Task;
Log.D($"Start postprocessing of task {task.Id}");
await Task.Delay(3000);
Log.D($"Finished prostprocessing of task {task.Id}");
OnDataProgressChanged(new DataProgressChangedArgs { TaskId = task.Id });
}
internal void OnDataProgressChanged(DataProgressChangedArgs args)
{
DataProgressChanged?.Invoke(this, args);
}
internal void OnDataReadCompleted(DataReadCompleteArgs args)
{
DataReadCompleted?.Invoke(this, args);
}
internal class DataProgressChangedArgs : EventArgs
{
public int TaskId { get; set; }
}
internal class DataReadCompleteArgs : EventArgs
{
}
}
public class ReadTaskWrapper
{
public int Id { get; set; }
public Task<string[][]> Task { get; set; }
}
public static class Log
{
public static void D(string msg)
{
Console.WriteLine($"{DateTime.Now.ToLongTimeString()}: {msg}");
}
}

输出表明它运行正确。在示例中,启动了4个任务,每次数据库访问需要5秒,而后期处理需要3秒(显而易见(。总运行时间约为23秒(4*5+3(,这意味着后处理与数据库读取并行执行。事件也如预期的那样发生。每次运行程序时,执行任务的顺序都不同。参见以下程序输出:

15:54:20: Start
15:54:20: Starting data read task 2
15:54:25: Finished data read task 2
15:54:25: Starting data read task 0
15:54:25: Start postprocessing of task 2
15:54:28: Finished prostprocessing of task 2
15:54:28: *** Event - Processed 2
15:54:30: Finished data read task 0
15:54:30: Starting data read task 3
15:54:30: Start postprocessing of task 0
15:54:33: Finished prostprocessing of task 0
15:54:33: *** Event - Processed 0
15:54:35: Finished data read task 3
15:54:35: Start postprocessing of task 3
15:54:35: Starting data read task 1
15:54:38: Finished prostprocessing of task 3
15:54:38: *** Event - Processed 3
15:54:40: Finished data read task 1
15:54:40: Start postprocessing of task 1
15:54:43: Finished prostprocessing of task 1
15:54:43: *** Event - Processed 1
15:54:43: *** Event - Data read complete

为了进一步研究:您在程序中的哪里实例化DataReader类,以及如何订阅这些事件?你能更详细地描述一下你所说的"这不是我所期望的‘等待’"是什么意思吗?

最新更新