如何在不等待的情况下对未来州进行民意调查



我正在尝试使用Future.rs在一个单独的过程中管理一些任务。我知道如何等待每个创建的未来,以及如何一个接一个地处理它们,但我无法在执行过程中轮询未来以了解其状态。我总是有错误:

线程"main"对"当前没有任务正在运行"感到恐慌

我想在未来的处理过程中做一些事情,直到它完成。也许我使用它的方式不对?我设法通过使用一个频道使其发挥作用,但我认为应该可以对未来进行民意调查,并在准备好后得出结果。我用来测试它的代码是:

fn main() {
println!("test future");
let thread_pool = CpuPool::new(4);
let mut future_execution_list = vec![];
let mutex = Arc::new(AtomicUsize::new(0));
//create the future to process
for _ in 0..10 {
let send_mutex = mutex.clone();
let future = thread_pool.spawn_fn(move || {
//Simulate long processing
thread::sleep(time::Duration::from_millis(10));
let num = send_mutex.load(Ordering::Relaxed);
send_mutex.store(num + 1, Ordering::Relaxed);
let res: Result<usize, ()> = Ok(num);
res
});
future_execution_list.push(future);
}
// do the job
loop {
for future in &mut future_execution_list {
match future.poll() {
Ok(Async::NotReady) => (), //do nothing
Ok(Async::Ready(num)) => {
//update task status
println!(" future {:?}", num);
}
Err(_) => {
//log error and set task status to err
()
}
};
}
//do something else
}
}

所以我在谢普马斯特回答之后完成了我的问题。你的话很有意思,但我仍然找不到解决我问题的办法。我会补充一些关于我的问题的信息。我想在一个可以同时管理多个任务的自动化系统上安排任务。存在一个循环,其中管理事件并计算任务调度。当一个任务被安排时,它就产生了。当任务结束时,将执行新的日程安排。在任务执行期间,对事件进行管理。特殊代码可以是:

loop {
event.try_recv() { ...} //manage user command for exemple
if (schedule) {
let tasks_to_spawn = schedule_task();
let futures = tasks_to_spawn.map(|task| {
thread_pool.spawn_fn( ....)}); 
let mut one = future::select_all(futures);
while let Ok((value, _idx, remaining)) = one.wait() {..} //wait here
}
//depend on task end state and event  set schedule to true or false.
}

我可以在未来将日程安排和任务联合起来,比如:

let future = schedule.and_them(|task| execute_task);

但是我仍然需要等待第一个任务的执行结束。我可以把一切都放在未来(事件管理、日程安排、任务),然后像你提议的那样等待第一个结束。我试过了,但我不知道如何用不同的Item和Error类型创建未来的vec。有了这个概念,我必须在线程之间管理更多的数据。事件管理和调度不必在不同的线程中执行。

我看到另一个问题,select_all拥有vec的所有权。如果在执行另一个任务的过程中必须安排一个新任务,我如何更改vec并添加新的future?

不知道你是否有一个简单的解决方案。我在想,在执行过程中用isDone()这样的方法获取未来的状态很简单,无需等待。也许这是有计划的,我没有看到公关。如果你有一个简单的解决方案,那就太好了,否则我会重新思考我的想法。

要轮询Future,您必须有一个Task。要获得Task,您可以在传递给futures::executor::spawn()Future中进行轮询。如果你像这样重写你的例子的loop

futures::executor::spawn(futures::lazy(|| {
// existing loop goes here
})).wait_future();

它在运行。

至于为什么一个Future只能在一个任务中被轮询,我相信这是因为轮询可以调用Task::Unpark

我想在未来处理时做点什么

据我所知,这就是期货的含义——可以并行发生的事情。如果你想做其他事情,那就创造另一个未来并投入其中!

您基本上已经在做这件事了——您的每个线程都在"做其他事情"。

轮询未来,当它准备好时获得结果

使用future::select_all,您可以组合多个期货并获得先完成的期货。然后由你决定是否等待下一个。

一种潜在的实施方式:

extern crate rand;
extern crate futures;
extern crate futures_cpupool;
use rand::Rng;
use futures::{future, Future};
use futures_cpupool::CpuPool;
use std::{thread, time};
fn main() {
let thread_pool = CpuPool::new(4);
let futures = (0..10).map(|i| {
thread_pool.spawn_fn(move || -> Result<usize, ()> {
let mut rng = rand::thread_rng();
// Simulate long processing
let sleep_time = rng.gen_range(10, 100);
let sleep_time = time::Duration::from_millis(sleep_time);
for _ in 0..10 {
println!("Thread {} sleeping", i);
thread::sleep(sleep_time);
}
Ok(i)
})
});
let mut one = future::select_all(futures);
while let Ok((value, _idx, remaining)) = one.wait() {
println!("Future #{} finished", value);
if remaining.is_empty() {
break;
}
one = future::select_all(remaining);
}
}

在调用wait的过程中,同时发生了多件事!这可以从交织输出中看出:

Thread 2 sleeping
Thread 0 sleeping
Thread 3 sleeping
Thread 1 sleeping
Thread 3 sleeping
Thread 0 sleeping
Thread 1 sleeping
Thread 2 sleeping
Thread 3 sleeping

您可以通过将每个线程的睡眠时间设置为1秒并对整个程序计时来验证事情是否并行发生。由于有10个期货,耗时1秒,并行度为4,因此整个程序运行耗时3秒。


奖金代码审查:

  1. 不要拆分加载和设置原子变量以实现递增-存储的值可能已被另一个线程在两次调用之间更改。使用fetch_add
  2. 您真的,真的在使用订单之前应该知道订单的含义。我不认识他们,所以我总是用SeqCst
  3. 由于它对这个例子并不重要,所以我完全删除了原子变量
  4. 总是喜欢收集到Vec中,而不是在循环中推进。这允许进行更优化的分配
  5. 在这种情况下,根本不需要Vec,因为select_all接受任何可以变成迭代器的东西

最新更新