为什么异步不允许并行处理,除非我手动创建新的线程?



我编写了一个Windows服务,我希望它使用与当前相同的逻辑工作,但是并行处理所有内容。

真正的代码库是相当抽象和私有的,所以我不能在这里发布源代码,但这里是它的要点。

应用程序是一个持久的进程调度程序。它利用EntityFramework 6扫描数据库,查找详细记录(除其他事项外):1.)运行进程的路径;2.)运行进程的日期/时间;

设置的频率。

基本功能
  1. 它在数据库中循环查找活动记录并返回所有计划的作业详细信息

  2. 在缓冲区内根据当前日期和时间检查日期和时间

  3. 如果作业应该运行,它已经使用new Process().Start(...)和记录中的路径,如果找到文件并且是可执行的,则初始化进程,然后等待退出或配置的超时阈值失效

  4. 每个进程运行的退出码或缺乏退出码(在挂起进程的情况下)单独决定记录是否保持活动并继续在将来动态地循环和重新调度,或者相反,停用并将错误记录到DB中的相关记录中。

  5. 除非明确停止,否则进程将永远继续。

当前并行工作(快1000%,但看起来可能会跳过记录!)也许我需要在访问数据库之前加一个锁?

结果是我是using (var process) {...},它正在抛出它正在被处理。盯着代码看了几天之后,我发现了我在试图保持整洁时犯的这个愚蠢的错误;p

var tasks = new List<Thread>;
schedules.ForEach(schedule => {
// I have also tried ThreadPool.QueueUserWorkerItem(...) but then I read its basically long-hand for Task.Run() and I don't think it was working the same as new Thread using this pattern.
var thread = new Thread(() => await ProcessSchedule(schedule));
// Actually using slim semaphore in wild, but for simplicty sake...
thread.Start();
threads.Add(thread);
});
// Before exiting...
while (!threads.All(instance => !instance.IsAlive))
{
await Delay(debounceValue);
continue;
}

工作顺序没有问题除了它的阻塞慢…

var tasks = new List<Task>;
schedules.ForEach(schedule => {
// I have also tried to just await this here, but that obviously will block anything on the same thread, so I add the tasks to a list a wait for completion after the loop is complete and the processes are working on their own process. 
tasks.Add(ProcessSchedule(schedule));
});
// Before exiting... 
// I expected this to work but it still seems to go record by record :*(
// Also tried using Task.Run(() => await task(...)) with no luck...
await Task.WhenAll(tasks);

注意:我在实际代码中将任务或线程列表传递到另一个级别,因此它可以在一切正常工作时进行处理和等待,但这是一些简化的borderline-psuedo代码,严格用于演示我正在努力的概念,尽可能简洁

ProcessSchedule

内部启动新进程并等待退出的异步方法。当接收到一个实例时,使用计划记录上的EntityFramework 6将成功或退出写入数据库,该计划记录驱动对该实例的流程进行解析和评估。如:

new Process(startInfo).Start();
// Monitor process, persist exit state via 
dbContext.SaveChangesAsync(); 
process.StandardError += handleProcExitListener;
process.StandardOutput += handleProcExitListener;
process.Exited += (...) => handleProcExitListener(...); 

我可以说:

我没有非等待的异步方法,除非它使用await Task.Run(MethodAsync),是在Main(argz)await Task.WhenAll(task);等。

是async-await阻塞我,因为DbContext不是线程安全的默认或什么?如果是这样的话,请有人验证我如何才能实现我正在寻找什么?

我已经尝试了许多技术,但我不能让应用程序同时运行每个进程,然后等待并在生成进程后的结束状态作出反应,除非我使用多线程(新线程直接可能也使用ThreadPool)。

我已经有一段时间没有使用线程了,主要是在c#中引入async-await之后。因此,在没有完全理解原因的情况下,我质疑自己使用它。如果你能帮助我理解我所遗漏的内容,我将非常感激。

在我看来,异步只是一种花哨的模式,可以方便地访问状态机特征。为什么当我研究这个主题时,我只是读到使用ThreadPool.QueueUserWorkerItem(...)自从TPLasync/await以来就相当过时了。如果async/await不给你新的线程工作,是并行运行的进程可能没有它?此外,这些进程的运行时间从10分钟到45分钟不等。所以你可以看到同时运行它们的重要性。

由于我使用的是。net 4.8,不幸的是,我不能使用v5+中引入的WaitForExitAsync()的异步版本。

<<p>解决方案/strong>我从以下异步进程开始并等待它完成建模了一个解决方案
public static Task<bool> WaitForExitAsync(this Process process, TimeSpan timeout)
{
ManualResetEvent processWaitObject = new ManualResetEvent(false);
processWaitObject.SafeWaitHandle = new SafeWaitHandle(process.Handle, false);
TaskCompletionSource<bool> tcs = new TaskCompletionSource<bool>();
RegisteredWaitHandle registeredProcessWaitHandle = null;
registeredProcessWaitHandle = ThreadPool.RegisterWaitForSingleObject(
processWaitObject,
delegate(object state, bool timedOut)
{
if (!timedOut)
{
registeredProcessWaitHandle.Unregister(null);
}
processWaitObject.Dispose();
tcs.SetResult(!timedOut);
},
null /* state */,
timeout,
true /* executeOnlyOnce */);
return tcs.Task;
}

即使您省略了一些Process代码,我假设您正在调用阻塞方法Process.WaitForExit而不是它的异步等效。我已经创建了一个模拟类型的解决方案,它并行运行。


private static async Task RunPowershellProcess()
{
using var process = new Process();
process.StartInfo.FileName = @"C:windowssystem32windowspowershellv1.0powershell.exe";
process.StartInfo.UseShellExecute = true;
process.Exited += (a, _) =>
{
var p = a as Process;
Console.WriteLine(p?.ExitCode);
};
process.EnableRaisingEvents = true;
process.Start();
await process.WaitForExitAsync();
}
static async Task Main(string[] args)
{
var tasks = new List<Task>(10);
for (var x = 0; x < 10; x++)
{
tasks.Add(RunPowershellProcess());
}
await Task.WhenAll(tasks);
}

最新更新