以下是使用事务范围的异步缓存和数据库更新。我无法使用 v 4.5.1 中引入的TransactionScopeAsyncFlowOption.Enabled
,因为我使用的 Apache Ignite.Net 缓存不支持它。我尝试通过捕获当前Synchronization Context
然后显式使用Synchronization Context Send
方法来完成事务来找到解决方法,但这不起作用,因为我仍然收到错误Transaction scope must be disposed on same thread it was created
任何关于如何实现Async Update
的建议。 Apache Ignite 支持的建议之一是使用类似的东西:
Task.WhenAll(cacheUpdate, databaseUpdate).Wait()
,但这会使异步代码同步,因此不是最佳选择之一
public async Task Update()
{
// Capture Current Synchronization Context
var sc = SynchronizationContext.Current;
TransactionOptions tranOptions = new TransactionOptions();
tranOptions.IsolationLevel = System.Transactions.IsolationLevel.RepeatableRead;
using (var ts = new TransactionScope())
{
// Do Cache Update Operation as Async
Task cacheUpdate = // Update Cache Async
// Do Database Update Operation as Async
Task databaseUpdate = // Update Database Async
await Task.WhenAll(cacheUpdate, databaseUpdate);
sc.Send(new SendOrPostCallback(
o =>
{
ts.Complete();
}), sc);
}
}
在对博客和文章进行了大量搜索后,我找到了Stephen Toub的以下博客,有助于在完全相同的线程上实现异步方法的延续,从而避免事务范围问题。现在我不需要TransactionScopeAsyncFlowOption.Enabled
就可以让异步方法在TransactionScope
https://blogs.msdn.microsoft.com/pfxteam/2012/01/20/await-synchronizationcontext-and-console-apps/
void Main()
{
// Modified Async Scheduler for Continuations to work on Exactly same thread
// Required in the case same Thread is required for Task Continuation post await
Run(async () => await DemoAsync());
"Main Complete".Dump();
}
static async Task DemoAsync()
{
// Transcation Scope test (shall dispose
using (var ts = new TransactionScope())
{
await Cache + Database Async update
ts.Complete();
"Transaction Scope Complete".Dump();
}
}
// Run Method to utilize the Single Thread Synchronization context, thus ensuring we can
// Control the threads / Synchronization context post await, cotinuation run of specific set of threads
public static void Run(Func<Task> func)
{
// Fetch Current Synchronization context
var prevCtx = SynchronizationContext.Current;
try
{
// Create SingleThreadSynchronizationContext
var syncCtx = new SingleThreadSynchronizationContext();
// Set SingleThreadSynchronizationContext
SynchronizationContext.SetSynchronizationContext(syncCtx);
// Execute Func<Task> to fetch the task to be executed
var t = func();
// On Continuation complete the SingleThreadSynchronizationContext
t.ContinueWith(
delegate { syncCtx.Complete(); }, TaskScheduler.Default);
// Ensure that SingleThreadSynchronizationContext run on a single thread
// Execute a Task and its continuation on same thread
syncCtx.RunOnCurrentThread();
// Fetch Result if any
t.GetAwaiter().GetResult();
}
// Reset the Previous Synchronization Context
finally { SynchronizationContext.SetSynchronizationContext(prevCtx); }
}
// Overriden Synchronization context, using Blocking Collection Consumer / Producer model
// Ensure that same Synchronization context / Thread / set of threads are maintained
// In this case we main a single thread for continuation post await
private sealed class SingleThreadSynchronizationContext : SynchronizationContext
{
// BlockingCollection Consumer Producer Model
private readonly BlockingCollection<KeyValuePair<SendOrPostCallback, object>>
m_queue = new BlockingCollection<KeyValuePair<SendOrPostCallback, object>>();
// Override Post, which is called during Async continuation
// Send is for Synchronous continuation
public override void Post(SendOrPostCallback d, object state)
{
m_queue.Add(
new KeyValuePair<SendOrPostCallback, object>(d, state));
}
// RunOnCurrentThread, does the job if fetching object from BlockingCollection and execute it
public void RunOnCurrentThread()
{
KeyValuePair<SendOrPostCallback, object> workItem;
while (m_queue.TryTake(out workItem, Timeout.Infinite))
workItem.Key(workItem.Value);
}
// Compete the SynchronizationContext
public void Complete() { m_queue.CompleteAdding(); }
}