启用异步事务范围而不带事务范围异步流选项.已启用



以下是使用事务范围的异步缓存和数据库更新。我无法使用 v 4.5.1 中引入的TransactionScopeAsyncFlowOption.Enabled,因为我使用的 Apache Ignite.Net 缓存不支持它。我尝试通过捕获当前Synchronization Context然后显式使用Synchronization Context Send方法来完成事务来找到解决方法,但这不起作用,因为我仍然收到错误Transaction scope must be disposed on same thread it was created

任何关于如何实现Async Update的建议。 Apache Ignite 支持的建议之一是使用类似的东西:

Task.WhenAll(cacheUpdate, databaseUpdate).Wait(),但这会使异步代码同步,因此不是最佳选择之一

public async Task Update()
{
    // Capture Current Synchronization Context
    var sc = SynchronizationContext.Current;
    TransactionOptions tranOptions = new TransactionOptions();
    tranOptions.IsolationLevel = System.Transactions.IsolationLevel.RepeatableRead;

    using (var ts = new TransactionScope())
    {
        // Do Cache Update Operation as Async
        Task cacheUpdate = // Update Cache Async
        // Do Database Update Operation as Async
        Task databaseUpdate = // Update Database Async
        await Task.WhenAll(cacheUpdate, databaseUpdate);
                sc.Send(new SendOrPostCallback(
                o =>
                {
                    ts.Complete();
                }), sc);        
    }
}

在对博客和文章进行了大量搜索后,我找到了Stephen Toub的以下博客,有助于在完全相同的线程上实现异步方法的延续,从而避免事务范围问题。现在我不需要TransactionScopeAsyncFlowOption.Enabled就可以让异步方法在TransactionScope

https://blogs.msdn.microsoft.com/pfxteam/2012/01/20/await-synchronizationcontext-and-console-apps/

void Main()
{
    // Modified Async Scheduler for Continuations to work on Exactly same thread
    // Required in the case same Thread is required for Task Continuation post await
    Run(async () => await DemoAsync());
    "Main Complete".Dump();
}
static async Task DemoAsync()
{
    // Transcation Scope test (shall dispose 
    using (var ts = new TransactionScope())
    {            
        await Cache + Database Async update
        ts.Complete();
        "Transaction Scope Complete".Dump();
    }   
}
// Run Method to utilize the Single Thread Synchronization context, thus ensuring we can
// Control the threads / Synchronization context post await, cotinuation run of specific set of threads
public static void Run(Func<Task> func)
{
    // Fetch Current Synchronization context
    var prevCtx = SynchronizationContext.Current;
    try
    {
        // Create SingleThreadSynchronizationContext
        var syncCtx = new SingleThreadSynchronizationContext();
        // Set SingleThreadSynchronizationContext
        SynchronizationContext.SetSynchronizationContext(syncCtx);
        // Execute Func<Task> to fetch the task to be executed
        var t = func();
        // On Continuation complete the SingleThreadSynchronizationContext
        t.ContinueWith(
            delegate { syncCtx.Complete(); }, TaskScheduler.Default);
        // Ensure that SingleThreadSynchronizationContext run on a single thread
        // Execute a Task and its continuation on same thread
        syncCtx.RunOnCurrentThread();
        // Fetch Result if any
        t.GetAwaiter().GetResult();
    }
    // Reset the Previous Synchronization Context
    finally { SynchronizationContext.SetSynchronizationContext(prevCtx); }
}
// Overriden Synchronization context, using Blocking Collection Consumer / Producer model
// Ensure that same Synchronization context / Thread / set of threads are maintained
// In this case we main a single thread for continuation post await
private sealed class SingleThreadSynchronizationContext : SynchronizationContext
{
    // BlockingCollection Consumer Producer Model
    private readonly BlockingCollection<KeyValuePair<SendOrPostCallback, object>>
      m_queue = new BlockingCollection<KeyValuePair<SendOrPostCallback, object>>();
    // Override Post, which is called during Async continuation
    // Send is for Synchronous continuation
    public override void Post(SendOrPostCallback d, object state)
    {
        m_queue.Add(
            new KeyValuePair<SendOrPostCallback, object>(d, state));
    }
    // RunOnCurrentThread, does the job if fetching object from BlockingCollection and execute it
    public void RunOnCurrentThread()
    {
        KeyValuePair<SendOrPostCallback, object> workItem;
        while (m_queue.TryTake(out workItem, Timeout.Infinite))
            workItem.Key(workItem.Value);
    }
    // Compete the SynchronizationContext
    public void Complete() { m_queue.CompleteAdding(); }
}

相关内容

  • 没有找到相关文章

最新更新