如何在特定的线程池上调度 C# 异步函数.(提供 Kotlin 示例)



我需要使用 async/await 在 C# 上翻译这个 Kotlin 代码。 我特别感兴趣的是如何创建线程池并在其上执行异步方法。

val myThreadPool1 = newFixedThreadPoolContext(1, "myThreadPool1")
val myThreadPool2 = newFixedThreadPoolContext(1, "myThreadPool2")
fun main() = runBlocking {
f1()
f2()
}
suspend fun f1() {
withContext(myThreadPool1) {
delay(2000)
println(Thread.currentThread().name)
}
}
suspend fun f2() {
withContext(myThreadPool2) {
delay(1000)
println(Thread.currentThread().name)
}
}

我为您/任何人提供了一个例子,介绍如何使用同步上下文在某个线程上继续等待的方法。我不知道你的 C# 经验,但这不是初学者的水平。

在此示例中,您可以看到两个异步方法相互"中断"。

我为此创建了一个要点(有点不同(:ASyncThread.cs

你可以用这个作为参考。

class Program
{
static async void First()
{
for (int i = 0; i < 10; i++)
{
Console.WriteLine($"{DateTime.Now}| First on Thread: {Thread.CurrentThread.ManagedThreadId}");
await Task.Delay(1000);
}
}
static async void Second()
{
for (int i = 0; i < 10; i++)
{
Console.WriteLine($"{DateTime.Now}| Second on Thread: {Thread.CurrentThread.ManagedThreadId}");
await Task.Delay(500);
}
}
static void Main(string[] args)
{
Console.WriteLine($"Hello World! on Thread: {Thread.CurrentThread.ManagedThreadId}");
using (var myThread = new AwaitEnabledThread())
{
myThread.Post(First);
myThread.Post(Second);
Console.ReadLine();
}
}
}

这将导致:

Hello World! on thread: 1                           
18-10-2019 12:14:34| First on Thread: 4             
18-10-2019 12:14:34| Second on Thread: 4            
18-10-2019 12:14:35| Second on Thread: 4            
18-10-2019 12:14:35| First on Thread: 4             
18-10-2019 12:14:36| Second on Thread: 4            
18-10-2019 12:14:36| Second on Thread: 4            
18-10-2019 12:14:37| First on Thread: 4             
18-10-2019 12:14:37| Second on Thread: 4            
18-10-2019 12:14:37| Second on Thread: 4            
18-10-2019 12:14:38| First on Thread: 4             
18-10-2019 12:14:38| Second on Thread: 4            
18-10-2019 12:14:38| Second on Thread: 4            
18-10-2019 12:14:39| Second on Thread: 4            
18-10-2019 12:14:39| First on Thread: 4             

public class AwaitEnabledThread : SynchronizationContext, IDisposable
{
// By JvanLangen.
private class ActionWithState
{
public SendOrPostCallback Action { get; set; }
public object State { get; set; }
}
private Task _mainTask;
private int _mainTaskThreadId;
private readonly ManualResetEvent _terminate = new ManualResetEvent(false);
private readonly AutoResetEvent _actionAdded = new AutoResetEvent(false);
private readonly ConcurrentQueue<ActionWithState> _actions = new ConcurrentQueue<ActionWithState>();
private void TaskMethod()
{
// because this class derives from SynchronizationContext
SynchronizationContext.SetSynchronizationContext(this); // <-------
_mainTaskThreadId = Thread.CurrentThread.ManagedThreadId;
var waitHandles = new WaitHandle[] { _terminate, _actionAdded };
while (WaitHandle.WaitAny(waitHandles) != 0)
while (_actions.TryDequeue(out var actionWithState))
actionWithState.Action(actionWithState.State);
}

public AwaitEnabledThread()
{
_mainTask = Task.Factory.StartNew(TaskMethod, TaskCreationOptions.LongRunning);
}
public override void Post(SendOrPostCallback d, object state = null)
{
_actions.Enqueue(new ActionWithState { Action = d, State = state });
_actionAdded.Set();
}
public void Post(Action action)
{
Post(s => action(), null);
}
public override void Send(SendOrPostCallback d, object state = null)
{
if (Thread.CurrentThread.ManagedThreadId != _mainTaskThreadId)
{
_actions.Enqueue(new ActionWithState { Action = d, State = state });
_actionAdded.Set();
}
else
d(state);
}
public void Send(Action action)
{
Send(s => action(), null);
}
public void Dispose()
{
_terminate.Set();
_mainTask.Wait();
}
public bool Terminated => _terminate.WaitOne(0);
}

更新:您甚至可以等待排队到线程的操作:

public static Task SendASync(this AwaitEnabledThread thread, SendOrPostCallback d, object state = null)
{
var tcs = new TaskCompletionSource<object>();
thread.Post(s =>
{
try
{
// execute the delegate
d(state);
// return to the previous SynchronizationContext
tcs.SetResult(null);
}
catch (Exception exception)
{
// return to the previous SynchronizationContext
tcs.SetException(exception);
}
}, tcs);
return tcs.Task;
}

用法:

using (var myThread = new AwaitEnabledThread())
{
await myThread.SendASync(First);
// even awaitable actions can be awaited
await myThread.SendASync(async s =>
{
await DoSomethingIO();
await DoSomethingIOAgain();
});

Console.ReadLine();
}

不同之处在于,第一个等待会影响当前线程。方法中的等待在目标线程上完成。

最新更新