我有以下C#代码。
var databaseRestore = new Microsoft.SqlServer.Management.Smo.Restore();
//databaseRestore.PercentComplete += CompletionStatusInPercent;
//databaseRestore.PercentCompleteNotification = 10;
//databaseRestore.Complete += Restore_Completed;
...
var complete = Observable
.FromEventPattern(databaseRestore, "Complete")
.Select(x=>x.EventArgs as ServerMessageEventArgs)
.Select(x=>x.Error.Message)
.Take(1)
.DumpLive("Complete");
var percentComplete = Observable
.FromEventPattern(databaseRestore, "PercentComplete")
.Select(x=>x.EventArgs as PercentCompleteEventArgs)
.Select(x=>x.Percent)
.TakeUntil(complete)
.DumpLive("PercentComplete");
...
databaseRestore.SqlRestore(server);
如果我这样运行它,首先总是来自处理程序的输出(如果我取消对它们的注释)。
然后,首先,Linqpad显示带有的"Live Observables"结果选项卡
- "完成"可观察,已完成,并带有最终结果消息
- "PercentComplete",仍在等待,并带有"-"(但没有条目?)
我想要的只是使用反应性外延来摆脱事件。首先应该是根据实际进度更新的"PercentComplete"可观察结果。然后用最后的信息"完成"。
问题是:如何正确地设置可观察性?
LINQPad的DumpLive
方法使用WPF进行渲染,因此如果主线程被阻塞,它将无法工作。
相反,您可以编写自己版本的DumpLive,使用HTML进行渲染。这会更慢(因为每次可观察到的发布值时,它都必须更新HTMLDOM),但无论主线程是否被阻塞,它都能工作。
这是代码:
IDisposable DumpLatest<T> (IObservable<T> obs)
{
var dc = new DumpContainer ().Dump();
var extensionToken = Util.GetQueryLifeExtensionToken();
return obs.Subscribe (
value => dc.Content = value,
ex => { dc.Content = ex; extensionToken.Dispose(); },
() => extensionToken.Dispose());
}
如果您在My Extensions中将其定义为扩展方法,您将能够在需要时调用它。
我猜这是DumpLive()在主线程上使用Dispatcher的错误。
您是在主线程上运行数据库恢复,还是阻塞主线程等待完成信号?
我做了一些实验,用DumpLive()在后台线程上运行了Observable——如果主线程被阻止(例如,用Wait或thread.Sleep),UI就不会更新。
即使它在不同的线程上,DumpLive()似乎也只能更新主线程上的UI。我猜它需要在与主线程关联的Dispatcher上呈现更新。
当使用DumpLive时,LINQPad在可观测性完成之前不会终止执行,所以如果您在主线程上有任何阻塞代码,请删除它,看看它是否有效。
这是我的实验代码。只要在下面的例子中添加DumpLive()
子句,就会看到阻塞行为:
void Main()
{
Task.Run(() => {
var progressor = new Progressor();
var complete = Observable
.FromEventPattern(
h => progressor.Complete += h,
h => progressor.Complete -= h)
.Select(_ => "Complete")
.Take(1);
// append this to see blocking
//.DumpLive();
complete.Subscribe(Console.WriteLine);
var percentComplete = Observable
.FromEventPattern<PercentCompleteEventArgs>(
h => progressor.PercentComplete += h,
h => progressor.PercentComplete -= h)
.TakeUntil(complete)
.Select(i => "PercentComplete " + i.EventArgs.PercentComplete);
// append this to see blocking
// .DumpLive();
percentComplete.Subscribe(Console.WriteLine);
});
Thread.Sleep(5000);
}
public class Progressor
{
public Progressor()
{
Observable.Interval(TimeSpan.FromSeconds(1)).Take(10)
.Subscribe(
i => RaisePercentComplete(((int)i+1) * 10),
() => RaiseComplete());
}
private void RaiseComplete()
{
var temp = Complete;
if(temp != null)
{
temp(this, EventArgs.Empty);
}
}
private void RaisePercentComplete(int percent)
{
var temp = PercentComplete;
if(temp != null)
{
temp(this, new PercentCompleteEventArgs(percent));
}
}
public event EventHandler Complete;
public event EventHandler<PercentCompleteEventArgs> PercentComplete;
}
public class PercentCompleteEventArgs : EventArgs
{
public int PercentComplete { get; private set; }
public PercentCompleteEventArgs(int percent)
{
PercentComplete = percent;
}
}