Drop database on drop Using Sqlx and Rust



我正在一步一步地遵循zero2prod Rust书。到目前为止,我已经进入了第3章的最后一部分,其中我们设置了一些集成测试,并在每次测试运行时创建一个数据库

我想扩展这个功能,并添加在每次测试运行后删除数据库的功能。这导致了我的Drop特性,我将在其中执行DROP DATABASE命令并将其称为一天(我将存储db名称及其连接/池,以便我以后可以访问它)。

问题是,应用程序在执行DROP DATABASE查询时挂起,并在60秒后超时。我不知道是什么原因造成的,因为我无法打印或调试连接。

这是我的代码:

use futures::executor;
use sqlx::{Connection, Executor, PgConnection, PgPool};
use std::net::TcpListener;
use uuid::Uuid;
use zero2prod::configuration::{get_configuration, DatabaseSettings};
const BASE_URL: &str = "127.0.0.1";
pub struct TestApp {
db_name: String,
connection_string: String,
pub address: String,
pub db_pool: PgPool,
pub connection: PgConnection,
}
/**
* We need to refactor our project into a library and a binary: all our logic will live in the library crate
while the binary itself will be just an entrypoint with a very slim main function
*/
pub async fn init(url: &str) -> TestApp {
let mut app = spawn_app().await;
app.address = format!("{}{}", app.address, url);
return app;
}
// Launch our application in the background ~somehow~
async fn spawn_app() -> TestApp {
// We take the BASE_URL const and assign it a port 0. We then
// pass the listener to the server
let base_url = format!("{}:0", BASE_URL);
let listener = TcpListener::bind(base_url).expect("Failed to bind random port");
// We retrieve the port assigned by the OS
let port = listener.local_addr().unwrap().port();
let (connection, db_connection, db_name, connection_string) = init_db().await;
// We pass the port now to our server
let server = zero2prod::run(listener, db_connection.clone()).expect("Failed to bind address");
let _ = actix_web::rt::spawn(server);
let address = format!("http://{}:{}", BASE_URL, port);
TestApp {
db_name: String::from(db_name),
address,
db_pool: db_connection,
connection,
connection_string,
}
}
async fn init_db() -> (PgConnection, PgPool, String, String) {
let mut configuration = get_configuration().expect("Failed to read configuration");
// We change the db name in each run as we need to run the test multiple times
configuration.database.database_name = Uuid::new_v4().to_string();
let (connection, pool) = configure_database(&configuration.database).await;
return (
connection,
pool,
String::from(&configuration.database.database_name),
configuration.database.connection_string_without_db(),
);
}
async fn configure_database(config: &DatabaseSettings) -> (PgConnection, PgPool) {
// The following returns:
//   format!(
//       "postgres://{}:{}@{}:{}",
//       self.username, self.password, self.host, self.port
//   )
let mut connection = PgConnection::connect(&config.connection_string_without_db())
.await
.expect("Failed to connect to Postgres.");
connection
.execute(format!(r#"CREATE DATABASE "{}""#, config.database_name).as_str())
.await
.expect("Failed to create the db");
// Migrate the database
let connection_pool = PgPool::connect(&config.connection_string())
.await
.expect("Failed to connect to Postgres");
sqlx::migrate!("./migrations")
.run(&connection_pool)
.await
.expect("Failed to migrate db");
return (connection, connection_pool);

入口点是init()函数,它基本上返回一个TestApp结构体(最初包含db_pooladdress字段)。上面的代码都可以正常运行。

问题在下面。这是我尝试过的所有内容:

  1. 使用Smol的运行时来运行async in drop -尝试初始化到Postgres数据库的新连接
impl Drop for TestApp {
fn drop(&mut self) {
smol::block_on(async {
let mut connection = PgConnection::connect(&self.connection_string)
.await
.expect("Failed to connect to Postgres.");
let result = connection
.execute(format!(r#"DROP DATABASE "{}""#, self.db_name).as_str())
.await
.expect("Error while querying the drop database");
println!("{:?}", result);
});
}
}
  1. 使用sml的运行时运行async in drop -尝试使用现有的db_pool
fn drop(&mut self) {
smol::block_on(async {
let result = self
.db_pool
.execute(format!(r#"DROP DATABASE "{}""#, self.db_name).as_str())
.await.expect("Error while querying");
println!("{:?}", result);
});
}
  1. 使用Future的crate执行器-使用现有的db_pool
let result = executor::block_on(
self.db_pool
.execute(format!(r#"DROP DATABASE "{}""#, self.db_name).as_str()),
)
.expect("Failed to drop database");
println!("{:?}", result);
  1. 使用Future的crate执行器-运行db_pool.acquire(),然后运行池(这挂在db_pool.acquire.
executor::block_on(self.db_pool.acquire()).expect("Failed to acquire pool");
let result = executor::block_on(
self.db_pool
.execute(format!(r#"DROP DATABASE "{}""#, self.db_name).as_str()),
)
.expect("Failed to drop database");
println!("{:?}", result);
  1. 使用Future的crate执行器-运行现有连接
let result = executor::block_on(
self.connection
.execute(format!(r#"DROP DATABASE "{}""#, self.db_name).as_str()),
)
.expect("Failed to drop database");
println!("{:?}", result);

请注意,代码不是最漂亮的,因为我试图找到一个工作的解决方案。

不幸的是,我不知道问题是什么,因为没有抛出错误。

任何想法?

sqlx>0.6.1 update

@Ziliang Lin的解决方案确实更加清晰,现在sqlx::test在sqlx中得到了支持。我的实现是这样的:

// Start the app
async fn spawn_app(pool: Pool<Postgres>) -> TestApp {
let listener = TcpListener::bind("127.0.0.1:0").expect("Failed to bind random port");
let port = listener.local_addr().unwrap().port();
let address = format!("http://127.0.0.1:{}", port);
let server = run(listener, pool.clone()).expect("Failed to bind address");
let _ = tokio::spawn(server);
TestApp {
address,
db_pool: pool,
}
}
// sqlx::test will automatically create the database, run migrations,
// and delete the database when the test is complete
#[sqlx::test]
async fn health_check_works(pool: Pool<Postgres>) {
let app = spawn_app(pool).await;
let client = reqwest::Client::new();
// Act
let response = client
// Use the returned application address
.get(&format!("{}/health_check", &app.address))
.send()
.await
.expect("Failed to execute request.");
// Assert
assert!(response.status().is_success());
assert_eq!(Some(0), response.content_length());
}

tokio::test->sqlx::test,删除configure_database函数,在test函数中增加pool: Pool<Postgres>参数

原始文章

我遇到了同样的问题。由于这里的一些答案,我最终得到了以下解决方案(注意,我为以&;test_db-&;开头的数据库名称添加了一个断言,因此您必须在数据库名称前加上或删除断言):

pub struct TestApp {
pub address: String,
pub db_pool: PgPool,
pub db_name: String,
pub db_connection_string: String,
}
impl TestApp {
async fn terminate(&mut self) {
assert!(self.db_name.starts_with("test_db-"));
println!("Cleaning up database: {}", self.db_name);
self.db_pool.close().await;
let mut connection = PgConnection::connect(&self.db_connection_string)
.await
.expect("Failed to connect to Postgres");
// Force drop all active connections to database
// TODO: see if there is a softer way to handle this (i.e. close connection when DB access is complete)
connection
.execute(
format!(
r#"
SELECT pg_terminate_backend(pg_stat_activity.pid)
FROM pg_stat_activity
WHERE pg_stat_activity.datname = '{}'
AND pid <> pg_backend_pid()
"#,
self.db_name
)
.as_str(),
)
.await
.expect("Failed to terminate current connections to test db");
connection
.execute(format!(r#"DROP DATABASE "{}";"#, self.db_name).as_str())
.await
.expect("Failed to drop database.");
println!("Database cleaned up successfully.")
}
}
impl Drop for TestApp {
fn drop(&mut self) {
std::thread::scope(|s| {
s.spawn(|| {
let runtime = tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()
.unwrap();
runtime.block_on(self.terminate());
});
});
}
}

对于以后遇到这个问题的人,我建议看一下sqlx的sqlx::test

这个过程宏将自动实现OP想要做的事情:

  1. 为每个集成测试创建一个由[sqlx::test]
  2. 注释的数据库
  3. 在测试运行后删除数据库

下面是我的代码示例:

#[sqlx::test]
async fn confirmations_without_token_rejected_with_400(pool: Pool<Postgres>) -> Result<()> {
// here I am using sea_orm, so I would do the migration inside `get_test_ap`
let app = get_test_app(pool).await?;
let cli = &app.cli;
let resp = cli.get("/subscriptions/confirm").send().await;
resp.assert_status(StatusCode::BAD_REQUEST);
Ok(())
}

如果你使用纯sqlx,你可以按照上面的链接找到下面的例子:

use sqlx::PgPool;
#[sqlx::test(migrations = "foo_migrations")]
async fn basic_test(pool: PgPool) -> sqlx::Result<()> {
let mut conn = pool.acquire().await?;
sqlx::query("SELECT * FROM foo")
.fetch_one(&mut conn)
.await?;

assert_eq!(foo.get::<String>("bar"), "foobar!");

Ok(())
}

相关内容

  • 没有找到相关文章

最新更新