ReactiveX:使 Observable.Create() 只被调用一次



我正在尝试使用 ReactiveX(更准确地说,Rx.Net)和 SQLite.Net 构建数据访问层。

部分作业是创建一个返回数据库连接的可观察量,以便仅在需要时可以延迟打开它。这是我到目前为止想出的:

var connection = Observable.Create<SQLiteConnection>(observer =>
{
Debug.WriteLine("CheckInStore: Opening database connection");
var database = new SQLiteConnection(configuration.ConnectionString.DatabasePath);
observer.OnNext(database);
observer.OnCompleted();
return Disposable.Create(() =>
{
Debug.WriteLine("CheckInStore: Closing database connection");
database.Close();
});
});

// Further down the line, a query would look like this:
var objects = connection.SelectMany(db => db.Query<>("select * from MyTable"));

不幸的是,每次有人订阅此可观察量时,都会创建一个新连接。一旦订阅被处置,它也会关闭。

我尝试使用.Replay(1).RefCount(),但它没有改变任何东西。无论如何,我不确定是否理解整个RefCount的事情。

如何使此数据库连接成为单一实例?

看看这段代码,它是等效的,但不打开数据库连接:

var conn = Observable.Create<int>(o =>
{
Debug.WriteLine("Opening");
o.OnNext(1);
o.OnCompleted(); //This forces closing code to be called. Comment me out.
return Disposable.Create(() =>
{
Debug.WriteLine("Closing");
});
})
//.Replay(1)
//.RefCount() //.Replay(1).RefCount is necessary if you want to cache the result
;
var sub1 = conn.SelectMany(i => Observable.Return(i)).Subscribe(i => Debug.WriteLine($"1: {i}"));
var sub2 = conn.SelectMany(i => Observable.Return(i)).Subscribe(i => Debug.WriteLine($"2: {i}"));
sub1.Dispose();
sub2.Dispose();
var sub3 = conn.SelectMany(i => Observable.Return(i)).Subscribe(i => Debug.WriteLine($"3: {i}"));
sub3.Dispose();

这里有许多问题:

  1. 每次您取消订阅或完成可观察量时,都会调用您的处置/取消订阅代码。由于您正在调用OnCompleted,因此每次都会打开/关闭。
  2. 如果要重复使用同一连接,则需要使用.Replay(1).RefCount()Observable.Create每次订阅者连接时都运行整个函数,则没有任何内容(除了.Replay(1).Refcount())为您缓存它。
  3. 即使您添加.Replay(1).Refcount()并删除OnCompleted,如果没有未完成的订阅(例如在sub2.Dispose()调用之后),您仍将获得处置(意味着数据库关闭)行为。
  4. 如果您不通过using(var sub = connection.SelectMany(...))或显式通过sub.Dispose()释放订阅,您将永远不会取消订阅,因为此可观察量无法终止。换句话说,与3相反的问题,你的Close代码永远不会发生。

我希望你明白:这是一种非常容易出错的做事方式。我建议使用简单的迭代调用,因为无论如何,这往往更适合数据库调用。如果您坚持使用 RX,我会查看您的数据库连接初始化Observable.Using

相关内容

  • 没有找到相关文章

最新更新