如何将可观测值序列到云中



我需要在Azure环境中将数据处理器序列(例如如何用.NET RX组织)分为几个计算单元。
这个想法是将可观察到的序列序列化为Azure队列(或服务总线),并将其序列化。
如果生产者或消费者失败,其他方应该能够继续生产/消费。

任何人都可以提出一种优雅的方法,以及要使用什么(Azure队列或服务巴士)?
是否有人使用过TCP可观察的提供商-http://rxx.codeplex.com/wikipage?title=tcp QBServable Provider for to the Party of Aparters a Partion approys安全?

想象您有一个Messaage队列,以下API

class MQ {
    public MQ();
    // send a single message from your message queue
    public void send(string keyPath, string msg);
    // Receive a single message from your message queue
    public async Task<string> receive(keyPath);
}

使此Rx兼容

class MQRX: IObserver<string> {
    MQ _mq;
    string _keyPath
    MQRX(string keyPath){
        _mq = mq;
        _keyPath = keyPath;
    }
    IObservable<string> Observe(){
        return Observable.Defer(()=> mq.receive(keyPath).ToObservable() ).Repeat();
    }
    void OnNext(string msg){
        _mq.send(msg);
    }
    void OnError(Exception e){
        // The message queue might not
        // support serializing exceptions
        // or it might or you might build
        // a protocol for it.
    }
}

以容错的方式使用它。注意如果有例外,请重试将重新订阅被OnError

传递的上游扔了上游
new MQRX("users/1/article/2").
    Retry().
    Subscribe((msg)=>Console.Writeln(msg));

例如,在写作方面,您可以每两秒钟发送一条消息并重试如果有错误,则订阅发电机。请注意不太可能可观察到的错误。想象一下从文件或其他消息队列阅读。

var mq = new MQRX("users/1/article/2");
Observable.Interval(TimeSpan.FromSeconds(2)).
    Select((x)=>x.ToString()).

注意,您可能应该使用iObservable catch扩展方法而不是盲目重试您可能会一次又一次遇到相同的错误。 重试()。 订阅(MQ);

我用30行VB代码写了自己的UDP给RX包装器,并且可以处理超时。我猜TCP包装器会很相似。

Imports System.Reactive
Imports System.Reactive.Linq
Imports System.Reactive.Threading.Tasks
Imports System.Threading
Imports System.Net
Imports System.Net.Sockets
Public Module UDP
    ''' <summary>
    ''' Generates an IObservable as a UDP stream on the IP endpoint with an optional
    ''' timeout value between results.
    ''' </summary>
    ''' <param name="timeout"></param>
    ''' <returns></returns>
    ''' <remarks></remarks>
    Public Function StreamObserver(
                localPort As Integer, 
                Optional timeout As Nullable(Of TimeSpan) = Nothing
                ) As IObservable(Of UdpReceiveResult)
        Return Linq.Observable.Using(Of UdpReceiveResult, UdpClient)(
            Function() New UdpClient(localPort),
            Function(udpClient)
                Dim o = Observable.Defer(Function() udpClient.
                                                                ReceiveAsync().
                                                                ToObservable())
                If Not timeout Is Nothing Then
                    Return Observable.Timeout(o.Repeat(), timeout.Value)
                Else
                    Return o.Repeat()
                End If
            End Function
        )
    End Function
End Module

,如果需要,您可以为书面方向做同样的事情。然后,您只需使用普通的RX技术将数据序列化为UDP帧。

new UDP.StreamObserver(8080, TimeSpan.FromSeconds(2)).
        Select( function(udpResult) MyDeserializeFunction(udpResult)).
        Subscribe( sub (result) DoSomething(result), 
                   sub(error) DoSomethingWithError ) 

相关内容

  • 没有找到相关文章