如何在服务器端取消订阅观察者



我有一个应用程序,它通过一个服务器(可观察对象)和许多客户端(观察者)远程访问使用RX。我的问题是,当客户端(观察者)断开连接没有执行订阅(处置),OnNext()函数在服务器上开始抛出远程异常。

是否有任何机制可以在服务器端取消订阅有问题的观察者?

客户端代码部分:

internal void SetRemoting(bool refreshInstance)
{
    string channelName = "RemotingClientUI";
    IDictionary dict = new Hashtable();
    dict["port"] = 9988;
    dict["name"] = channelName;
    var bcp = new BinaryClientFormatterSinkProvider();
    var channel = new TcpClientChannel(dict, bcp);
    ChannelServices.RegisterChannel(channel, false);
    _remoteServer = (IRemoteServerService) Activator.
        GetObject(typeof (IRemoteServerService),
            tcp://...");
}
private void SubscribeToRemoteEvents(bool unSubscrubeFirst)
{            
    _jobRowUpdate = _remoteServer.JobRowUpdate.Subscribe(UpdateJobQueueRow);
    _packageRowUpdate = _remoteServer.PackageRowUpdate.   
        Subscribe(UpdatePackageQueueRow);
    _miscUpdate = _remoteServer.MiscAction.Subscribe(MiscRemoteActions);
}

服务端代码部分:

public class RemoteServiceService
{
    public RemoteServiceService()
    {
        JobRowUpdate = LoggerFactory.GetLogger(
            LoggerType.RemoteService, this).JobRowUpdate.Remotable();
        PackageRowUpdate = LoggerFactory.GetLogger(
            LoggerType.RemoteService, this).PackageRowUpdate.Remotable();
        MiscAction = LoggerFactory.GetLogger(
            LoggerType.RemoteService, this).MiscActions.Remotable();
    }
}
public class RemoteLoggerForService
{
    private RemoteLoggerForService(IService service)
    {
        _jobRowUpdate = new Subject<IJobQueueRow>();
        _packageRowUpdate = new Subject<IPackageQueueRow>();
        _miscActions = new Subject<MiscRemoteObjects>();
        _service = service;
    }
    #region Overrides of LoggerBase
    public override void WriteToLog<T>(T stringFormatOrObject, 
        params object[] args)
    {
        lock (this)
        try
        {
            lock (LockLogger)
            {
                if (stringFormatOrObject is IJobQueueRow && 
                    _jobRowUpdate != null)
                {
                    _jobRowUpdate.OnNext(
                        stringFormatOrObject as IJobQueueRow);
                }
                if (stringFormatOrObject is IPackageQueueRow && 
                    _packageRowUpdate != null)
                {
                    _packageRowUpdate.OnNext(
                        stringFormatOrObject as IPackageQueueRow);
                }
                if (stringFormatOrObject is MiscRemoteObjects && 
                    _miscActions != null)
                {
                    _miscActions.OnNext(
                        stringFormatOrObject as MiscRemoteObjects);
                }
            }
        }
        catch(Exception ex)
        {
            LoggerFactory.GetLogger(LoggerType.File, null).
                WriteToLog(
                    Utils.GetFullException("RemoteLoggerForService", ex));
        }
    }
    #endregion
}

我认为你要做的是吞下异常,让你的服务器继续运行。你可以通过捕获异常(就像你在这里所做的那样)来做到这一点,然后你可以把它传递给相关Observable的OnError方法,这样订阅者就可以选择如何做出反应。

相关内容

  • 没有找到相关文章

最新更新