我有一个应用程序,它通过一个服务器(可观察对象)和许多客户端(观察者)远程访问使用RX。我的问题是,当客户端(观察者)断开连接没有执行订阅(处置),OnNext()函数在服务器上开始抛出远程异常。
是否有任何机制可以在服务器端取消订阅有问题的观察者?
客户端代码部分:
internal void SetRemoting(bool refreshInstance)
{
string channelName = "RemotingClientUI";
IDictionary dict = new Hashtable();
dict["port"] = 9988;
dict["name"] = channelName;
var bcp = new BinaryClientFormatterSinkProvider();
var channel = new TcpClientChannel(dict, bcp);
ChannelServices.RegisterChannel(channel, false);
_remoteServer = (IRemoteServerService) Activator.
GetObject(typeof (IRemoteServerService),
tcp://...");
}
private void SubscribeToRemoteEvents(bool unSubscrubeFirst)
{
_jobRowUpdate = _remoteServer.JobRowUpdate.Subscribe(UpdateJobQueueRow);
_packageRowUpdate = _remoteServer.PackageRowUpdate.
Subscribe(UpdatePackageQueueRow);
_miscUpdate = _remoteServer.MiscAction.Subscribe(MiscRemoteActions);
}
服务端代码部分:
public class RemoteServiceService
{
public RemoteServiceService()
{
JobRowUpdate = LoggerFactory.GetLogger(
LoggerType.RemoteService, this).JobRowUpdate.Remotable();
PackageRowUpdate = LoggerFactory.GetLogger(
LoggerType.RemoteService, this).PackageRowUpdate.Remotable();
MiscAction = LoggerFactory.GetLogger(
LoggerType.RemoteService, this).MiscActions.Remotable();
}
}
public class RemoteLoggerForService
{
private RemoteLoggerForService(IService service)
{
_jobRowUpdate = new Subject<IJobQueueRow>();
_packageRowUpdate = new Subject<IPackageQueueRow>();
_miscActions = new Subject<MiscRemoteObjects>();
_service = service;
}
#region Overrides of LoggerBase
public override void WriteToLog<T>(T stringFormatOrObject,
params object[] args)
{
lock (this)
try
{
lock (LockLogger)
{
if (stringFormatOrObject is IJobQueueRow &&
_jobRowUpdate != null)
{
_jobRowUpdate.OnNext(
stringFormatOrObject as IJobQueueRow);
}
if (stringFormatOrObject is IPackageQueueRow &&
_packageRowUpdate != null)
{
_packageRowUpdate.OnNext(
stringFormatOrObject as IPackageQueueRow);
}
if (stringFormatOrObject is MiscRemoteObjects &&
_miscActions != null)
{
_miscActions.OnNext(
stringFormatOrObject as MiscRemoteObjects);
}
}
}
catch(Exception ex)
{
LoggerFactory.GetLogger(LoggerType.File, null).
WriteToLog(
Utils.GetFullException("RemoteLoggerForService", ex));
}
}
#endregion
}
我认为你要做的是吞下异常,让你的服务器继续运行。你可以通过捕获异常(就像你在这里所做的那样)来做到这一点,然后你可以把它传递给相关Observable的OnError
方法,这样订阅者就可以选择如何做出反应。