什么是 rx 的 Observable.FromEventPattern 的 TPL 等价物?



在rx中,您可以写入:

var oe = Observable.FromEventPattern<SqlNotificationEventArgs>(sqlDep, "OnChange");

然后订阅observable以将sqlDep对象上的OnChange事件转换为observable。

类似地,如何使用任务并行库从C#事件创建任务?

编辑:澄清Drew指出并由user375487明确编写的解决方案适用于单个事件。任务一完成。。。好吧,它完成了。

可观察事件能够在任何时候再次触发。它可以被看作是一个可观察的流。TPL数据流中的一种ISourceBlock。但是在医生http://msdn.microsoft.com/en-us/library/hh228603(v=vs.110).aspx没有ISourceBlock的例子。

我最终找到了一个论坛帖子,解释了如何做到这一点:http://social.msdn.microsoft.com/Forums/en/tpldataflow/thread/a10c4cb6-868e-41c5-b8cf-d122b514db0e

公共静态ISourceBlock CreateSourceBlock(Action、Action、Action,ISourceBlock>executor){var bb=新的BufferBlock();executor(t=>bb.Post(t),()=>bb.Complete(),e=>bb.Fault(e),bb);返回bb;}

//Remark the async delegate which defers the subscription to the hot source.
var sourceBlock = CreateSourceBlock<SomeArgs>(async (post, complete, fault, bb) =>
{
    var eventHandlerToSource = (s,args) => post(args);
    publisher.OnEvent += eventHandlerToSource;
    bb.Complete.ContinueWith(_ => publisher.OnEvent -= eventHandlerToSource);
});

我还没有尝试过上面的代码。异步委托和CreateSourceBlock的定义之间可能不匹配。

在TPL中烘焙的事件异步模式(EAP)没有直接的等价物。您需要做的是使用TaskCompletionSource<T>,在事件处理程序中向自己发出信号。查看MSDN上的这一部分,了解使用WebClient::DownloadStringAsync演示模式的示例。

您可以使用TaskCompletionSource。

public static class TaskFromEvent
{
    public static Task<TArgs> Create<TArgs>(object obj, string eventName)
        where TArgs : EventArgs
    {
        var completionSource = new TaskCompletionSource<TArgs>();
        EventHandler<TArgs> handler = null;
        handler = new EventHandler<TArgs>((sender, args) =>
        {
            completionSource.SetResult(args);
            obj.GetType().GetEvent(eventName).RemoveEventHandler(obj, handler);
        });
        obj.GetType().GetEvent(eventName).AddEventHandler(obj, handler);
        return completionSource.Task;
    }
}

示例用法:

public class Publisher
{
    public event EventHandler<EventArgs> Event;
    public void FireEvent()
    {
        if (this.Event != null)
            Event(this, new EventArgs());
    }
}
class Program
{
    static void Main(string[] args)
    {
        Publisher publisher = new Publisher();
        var task = TaskFromEvent.Create<EventArgs>(publisher, "Event").ContinueWith(e => Console.WriteLine("The event has fired."));
        publisher.FireEvent();
        Console.ReadKey();
    }
}

EDIT根据您的澄清,以下是如何使用TPL DataFlow实现目标的示例。

public class EventSource
{
    public static ISourceBlock<TArgs> Create<TArgs>(object obj, string eventName)
        where TArgs : EventArgs
    {
        BufferBlock<TArgs> buffer = new BufferBlock<TArgs>();
        EventHandler<TArgs> handler = null;
        handler = new EventHandler<TArgs>((sender, args) =>
        {
            buffer.Post(args);
        });
        buffer.Completion.ContinueWith(c =>
            {
                Console.WriteLine("Unsubscribed from event");
                obj.GetType().GetEvent(eventName).RemoveEventHandler(obj, handler);
            });
        obj.GetType().GetEvent(eventName).AddEventHandler(obj, handler);
        return buffer;
    }
}
public class Publisher
{
    public event EventHandler<EventArgs> Event;
    public void FireEvent()
    {
        if (this.Event != null)
            Event(this, new EventArgs());
    }
}
class Program
{
    static void Main(string[] args)
    {
        var publisher = new Publisher();
        var source = EventSource.Create<EventArgs>(publisher, "Event");
        source.LinkTo(new ActionBlock<EventArgs>(e => Console.WriteLine("New event!")));
        Console.WriteLine("Type 'q' to exit");
        char key = (char)0;
        while (true)
        {
            key = Console.ReadKey().KeyChar;             
            Console.WriteLine();
            if (key == 'q') break;
            publisher.FireEvent();
        }
        source.Complete();
        Console.ReadKey();
    }
}

相关内容

  • 没有找到相关文章

最新更新