我正在使用Rust和tonic构建gRPC服务器,并且在返回流的函数中存在一些问题。到目前为止,我看到的唯一示例是在函数中方便地创建tx和rx通道——但是如果需要从应用程序的其他部分接收数据,这并没有多大帮助。我有下面的代码,但我得到一个错误。
use std::sync::Arc;
use std::sync::Mutex;
use futures::{Stream, StreamExt};
use tokio::sync::mpsc;
use tokio::sync::mpsc::{Sender, Receiver};
use tokio_stream::wrappers::ReceiverStream;
use tonic::transport::Server;
use tonic::{Request, Response, Status};use resourcemanager::{LineRequest, LineResponse, Position};
use resourcemanager::resource_manager_server::{ResourceManager, ResourceManagerServer};
pub mod resourcemanager {
tonic::include_proto!("resourcemanager");
}
#[derive(Debug)]
pub struct ResourceManagerService {
linear_rx: mpsc::Receiver<Position>,
linear_tx: mpsc::Sender<Position>
}
#[tonic::async_trait]
impl ResourceManager for ResourceManagerService {
async fn draw_line(&self, request: Request<LineRequest>) -> Result<Response<LineResponse>, Status> {
Ok(Response::new(LineResponse::default()))
}
type StreamLinearMotorMovementStream = ReceiverStream<Result<Position, Status>>;
async fn stream_linear_motor_movement(
&self,
request: Request<()>
) -> Result<Response<Self::StreamLinearMotorMovementStream>, Status> {
println!("Streaming motor movements");
let (tx, mut rx) = mpsc::channel(1);
tokio::spawn(async move {
while let Some(received) = self.linear_rx.recv().await {
tx.send(Ok(received.clone())).await.unwrap();
}
});
Ok(Response::new(ReceiverStream::new(rx)))
}
}
fn main() {
println!("Hello, world!");
}
误差
error[E0759]: `self` has lifetime `'life0` but it needs to satisfy a `'static` lifetime requirement
--> src/main.rs:30:10
|
30 | &self,
| ^^^^ this data with lifetime `'life0`...
...
36 | tokio::spawn(async move {
| ------------ ...is used and required to live as long as `'static` here
|
note: `'static` lifetime requirement introduced by this bound
--> /Users/xxxxxxxx/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.18.2/src/task/spawn.rs:127:28
|
127 | T: Future + Send + 'static,
| ^^^^^^^
这个错误显示在&self
下面:
async fn stream_linear_motor_movement(
&self,
request: Request<()>
)
错误信息基本上说明了一切。缩写:
async fn stream_linear_motor_movement(&self) {
let (tx, mut rx) = mpsc::channel(1);
tokio::spawn(async move {
while let Some(received) = self.linear_rx.recv().await {}
});
}
新生成的任务中的片段self.linear_rx.recv().await
强制编译器将self
移到闭包中,因此闭包可以访问self.linear_rx
。然而,由于新任务可以永远运行,它要求其捕获的上下文具有'static
的生存期,而&self
具有life0
的有限生存期,可能比'static
短(无论结果是什么)。这意味着您不能从新生成的任务中访问self
(或从它派生的任何内容),因为无法保证在任务执行时它将在周围。
您可以做的是将ResourceManagerService
中的linear_rx
移动到Arc
,.clone()
和stream_linear_motor_movement
中的Arc
中,并将克隆移动到闭包中。根据您想要完成的任务,您还可以将linear_rx
移动到Option
和.take()
中,Option
在stream_linear_motor_movement
中,将None
留在其位置。在这两种情况下,您都将拥有的对象转移到新生成的任务中,该任务的生存期不会短于'static
。请注意,Arc
允许多次调用stream_linear_motor_movement
,而Option
只允许调用一次(因为linear_rx
在第一次调用时被移走了)。