我试图公开一个可观察序列,该序列为观察者提供数据库表中的所有现有记录以及任何未来的项目。为了便于讨论,我们假设它是日志项。因此,我会有这样的内容:
public class LogService
{
private readonly Subject<LogEntry> entries;
public LogService()
{
this.entries = new Subject<LogEntry>();
this.entries
.Buffer(...)
.Subscribe(async x => WriteLogEntriesToDatabaseAsync(x));
}
public IObservable<LogEntry> Entries
{
get { return this.entries; }
}
public IObservable<LogEntry> AllLogEntries
{
get
{
// how the heck?
}
}
public void Log(string message)
{
this.entries.OnNext(new LogEntry(message));
}
private async Task<IEnumerable<LogEntry>> GetLogEntriesAsync()
{
// reads existing entries from DB table and returns them
}
private async Task WriteLogEntriesToDatabaseAsync(IList<LogEntry> entries)
{
// writes entries to the database
}
}
我最初对AllLogEntries
实现的想法是这样的:
return Observable.Create<LogEntry>(
async observer =>
{
var existingEntries = await this.GetLogEntriesAsync();
foreach (var existingEntry in existingEntries)
{
observer.OnNext(existingEntry);
}
return this.entries.Subscribe(observer);
});
但是这样做的问题是,可能存在已被缓冲但尚未写入数据库的日志条目。因此,这些条目将被遗漏,因为它们不在数据库中,并且已经通过了entries
可观察对象。
我的下一个想法是将缓冲项与非缓冲项分开,并在实现AllLogEntries
时使用缓冲项:
return Observable.Create<LogEntry>(
async observer =>
{
var existingEntries = await this.GetLogEntriesAsync();
foreach (var existingEntry in existingEntries)
{
observer.OnNext(existingEntry);
}
return this.bufferedEntries
.SelectMany(x => x)
.Subscribe(observer);
});
这里有两个问题:
- 这意味着
AllLogEntries
的客户机在接收日志条目之前也必须等待缓冲区时间间隔通过。我想让他们立即看到日志条目。 - 仍然存在一个竞争条件,在我完成读取现有条目和返回未来条目之间,日志条目可以写入数据库。
所以我的问题是:我该如何在没有竞争条件的情况下实现我的需求,并避免任何重大的性能损失?
要通过客户端代码做到这一点,您可能必须使用轮询实现解决方案,然后查找调用之间的差异。可能将解决方案与
结合使用- Observable.Interval(): http://rxwiki.wikidot.com/101samples#toc28, and
- Observable.DistinctUntilChanged ()
会给你足够的解决方案。
或者,我建议您尝试找到一个解决方案,当DB/表更新时通知客户端。在web应用程序中,您可以使用SignalR之类的东西来完成此操作。
例如:http://techbrij.com/database-change-notifications-asp-net-signalr-sqldependency
如果不是web应用程序,可以使用类似的套接字更新机制。
请参阅以下链接(这些链接来自SignalR轮询数据库更新的可接受答案):
- http://xsockets.net/api/net-c snippet61
- https://github.com/codeplanner/XSocketsPollingLegacyDB