rust中新线程上的异步循环:特性"std::future::future"未为"()&q



我知道这个问题已经被问过很多次了,但我仍然不知道该怎么办(下面会详细介绍(。

我正在尝试使用std::thread::spawn生成一个新线程,然后在其中运行一个异步循环

我想运行的异步函数:

#[tokio::main] 
pub async fn pull_tweets(pg_pool2: Arc<PgPool>, config2: Arc<Settings>) {
let mut scheduler = AsyncScheduler::new();
scheduler.every(10.seconds()).run(move || {
let arc_pool = pg_pool2.clone();
let arc_config = config2.clone();
async {
pull_from_main(arc_pool, arc_config).await;
}
});
tokio::spawn(async move {
loop {
scheduler.run_pending().await;
tokio::time::sleep(Duration::from_millis(100)).await;
}
});
}

生成要运行的线程:

#[actix_web::main]
async fn main() -> std::io::Result<()> {
let handle = thread::spawn(move || async {
pull_tweets(pg_pool2, config2).await;
});
}

错误:

error[E0277]: `()` is not a future
--> src/main.rs:89:9
|
89 |         pull_tweets(pg_pool2, config2).await;
|         ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ `()` is not a future
|
= help: the trait `std::future::Future` is not implemented for `()`
= note: required by `poll`

这里的最后一条评论在概括这个问题方面做得非常好。在某种程度上,似乎需要一个实现IntoFuture的返回值,但我没有。我尝试将Ok(())添加到闭包和函数中,但没有成功。

  • 添加到闭包实际上没有任何作用
  • 添加到async函数中给了我一个新的,但听起来类似的错误:
`Result<(), ()>` is not a future

然后我注意到答案中特别提到了扩展函数,而我没有使用它。本文还讨论了扩展函数。

其他一些SO答案:

  • 这是由缺少async引起的
  • 这个和这个是reqwest库特定的

所以似乎都不起作用。有人能帮我理解1(为什么这里存在错误以及2(如何修复它吗?

注1:将std::thread::spawn替换为tokio::task::spawn_blocking可以很容易地解决所有这些问题。但根据本文,我有目的地尝试线程派生。

注2:关于我想要实现的目标的更广泛的上下文:我在异步循环中从推特上提取150000条推文。我想比较两种实现:在主运行时上运行与在单独的线程上运行。后者是我挣扎的地方。

注意3:在我看来,线程和异步任务是两种不同的基元,它们不会混合。也就是说,生成一个新线程不会影响任务的行为,而生成新任务只会增加现有线程的工作量。如果这种世界观是错误的,请告诉我(以及我能读到的内容(。

#[tokio::main]将您的函数转换为以下内容:

#[tokio::main] 
pub fn pull_tweets(pg_pool2: Arc<PgPool>, config2: Arc<Settings>) {
let rt = tokio::Runtime::new();
rt.block_on(async {
let mut scheduler = AsyncScheduler::new();
// ...
});
}

请注意,它是一个同步函数,它生成一个新的运行时并运行内部future以完成。你不需要await,它是一个单独的运行时,有自己的专用线程池和调度程序:

#[actix_web::main]
async fn main() -> std::io::Result<()> {
let handle = thread::spawn(move || {
pull_tweets(pg_pool2, config2);
});
}

请注意,您的原始示例在另一方面是错误的:

#[actix_web::main]
async fn main() -> std::io::Result<()> {
let handle = thread::spawn(move || async {
pull_tweets(pg_pool2, config2).await;
});
}

即使pull_tweets异步函数,线程也不会做任何事情,因为您所做的只是在async块中创建另一个future。创建的future不会被执行,因为future是懒惰的(无论如何,该线程中没有执行器上下文(。

我会构造代码,以便直接在新线程中生成运行时,并从中调用您想要的任何async函数:

#[actix_web::main]
async fn main() -> std::io::Result<()> {
let handle = thread::spawn(move || {
let rt = tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()
.unwrap();
rt.block_on(async {
pull_tweets(pg_pool2, config2).await;
});
});
}
pub async fn pull_tweets(pg_pool2: Arc<PgPool>, config2: Arc<Settings>) {
// ...
}

相关内容

  • 没有找到相关文章

最新更新