如何在使用rust-sqlx/tokio时取消长时间运行的查询



我正在从rusqlite迁移,在那里我使用get_interrupt_handle从另一个线程立即中止查询(当用户更改过滤器参数时)。

下面是我当前代码的一个示例。我能做的最好的是在每个await之前添加一个中断检查,但如果初始查询需要很长时间才能返回第一个结果,那就没有帮助了。

struct Query {
title: String,
}
fn start_async(requests: crossbeam::channel::Receiver<Query>) {
thread::spawn(move || {
let runtime = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap();
runtime.block_on(run_db_thread(requests));
});
}
async fn run_db_thread(requests: crossbeam::channel::Sender<Query>) {
let connection = SqliteConnection::connect("test.sqlite").await?;
loop {
if let Ok(query) = requests.recv() {
do_query(&connection, &query).await?;
}
}
}
async fn do_query(connection: &SqliteConnection, query: &Query) -> Result<(), Box<dyn Error>> {
let mut stream = sqlx::query("SELECT title, authors, series FROM Books where title like ?")
.bind(&format!("%{}%", query.title))
.fetch(&connection);
while let Some(row) = stream.next().await {
let (title, authors, series) = row?;
println!("{} {} {}", title, authors, series);
}
}

是否有一种方法可以在新的Query到达通道时中断正在运行的sqlx执行?如有必要,我很乐意单独发信号。

All期货本质上是可取消的——这是async相对于阻塞多线程代码的一个好处(和危险)。你只需放弃未来,而不是进一步轮询它。

您需要做的第一件事是将阻塞通道更改为async通道-这允许检查通道与运行查询混合。然后,您可以使用各种未来操作工具来决定是否继续运行查询。我决定用select!来做这件事,它轮询几个期货,并根据哪个先完成运行代码。

(可能有更好的工具;我熟悉async的工作原理,但没有写过很多复杂的async代码。

use futures::future::OptionFuture;
use std::time::Duration;
use tokio::sync::mpsc;
async fn run_db_thread(mut requests: mpsc::Receiver<Query>) {
// This variable holds the current query being run, if there is one
let mut current_query_future = OptionFuture::default();
loop {
tokio::select! {
// If we receive a new query, replace the current query with it.
Some(query) = requests.recv() => {
println!("Starting new query {query:?}");
current_query_future = OptionFuture::from(Some(Box::pin(async move {
let answer = do_query(&query).await;
println!("Finished query {query:?} => {answer:?}");
answer
})));
// Note that we did not `.await` the new future.
}
// Poll the current query future, and check if it is done yet.
Some(_answer) = &mut current_query_future => {
// Stop polling the completed future.
current_query_future = None.into();
}
// We get here if both of the above branches saw None, which means that the
// channel is closed, *and* there is no query to run.
else => {
println!("Channel closed; run_db_thread() exiting");
break;
}
}
}
}
/// Example to drive the loop.
#[tokio::main]
async fn main() {
let (sender, receiver) = mpsc::channel(1);
tokio::spawn(run_db_thread(receiver));
for (i, delay) in [1000, 1000, 1, 1, 1, 1000, 1000].into_iter().enumerate() {
sender.send(Query(i)).await.unwrap();
tokio::time::sleep(Duration::from_millis(delay)).await;
}
println!("main() exiting");
}
// Skeleton data types to make the example compile
#[derive(Debug)]
#[allow(dead_code)]
struct Query(usize);
#[derive(Debug)]
struct Answer;
async fn do_query(_q: &Query) -> Answer {
tokio::time::sleep(Duration::from_millis(100)).await;
Answer
}

这个示例代码打印:

Starting new query Query(0)
Finished query Query(0) => Answer
Starting new query Query(1)
Finished query Query(1) => Answer
Starting new query Query(2)
Starting new query Query(3)
Starting new query Query(4)
Starting new query Query(5)
Finished query Query(5) => Answer
Starting new query Query(6)
Finished query Query(6) => Answer
main() exiting

也就是说,查询0、1、5和6已经完成,但是查询2、3和4在它们完成之前被一个新的查询取消了。

最新更新