Rust中用于HTTP服务器推送(流式传输)的客户端



由于缺少更好的示例,我们假设我想用Rust编写一个简单的客户端,该客户端可以建立连接并从Twitter的HTTP Streaming API接收数据。这可能吗?我一直在关注Iron和Nickel,它们看起来是很好的框架,但我认为它们还没有这个功能?

http客户端超支持增量读取响应(就像任何实现rust的Reader特性的东西一样),但我找不到任何东西来增量解析响应,或者实现twitter的特定协议(用rn结束objecs)。

也就是说,我能够实现一个快速的概念验证:

编辑:在github上查看并玩它。

use rustc_serialize::json::Json;
use std::str;
pub trait JsonObjectStreamer {
    fn json_objects(&mut self) -> JsonObjects<Self>;
}
impl<T: Buffer> JsonObjectStreamer for T {
    fn json_objects(&mut self) -> JsonObjects<T> {
        JsonObjects { reader: self }
    }
}
pub struct JsonObjects<'a, B> where B: 'a {
    reader: &'a mut B
}
impl<'a, B> Iterator for JsonObjects<'a, B> where B: Buffer + 'a {
    type Item = Json;
    fn next(&mut self) -> Option<Json> {
        let mut line_bytes = match self.reader.read_until(b'r') {
            Ok(bytes) => bytes,
            Err(_)    => return None,
        };
        if line_bytes.last() == Some(&b'r') {
            // drop the r
            line_bytes.pop();
            // skip the n
            match self.reader.read_char() {
                Ok(_)  => (),
                Err(_) => return None,
            }
        }
        let line = match str::from_utf8(&line_bytes) {
            Ok(line) => line,
            Err(_)   => return None
        };
        Json::from_str(line).ok()
    }
}

用法:(假设您已将其放在项目的src/json_streamer.rs文件中)

#![feature(io)]
extern crate hyper;
extern crate "rustc-serialize" as rustc_serialize;
mod json_streamer;
use hyper::Client;
use std::old_io::BufferedReader;
use json_streamer::JsonObjectStreamer;
fn main() {
    let mut client = Client::new();
    let res = client.get("http://localhost:4567/").send().unwrap();
    for obj in BufferedReader::new(res).json_objects() {
        println!("object arrived: {}", obj);
    }
}

我用这个小的sinatra应用程序来测试它:

require 'sinatra'
require 'json'
class Stream
  def each
    hash = { index: 0 }
    loop do
      hash[:index] += 1
      yield hash.to_json + "rn"
      sleep 0.5
    end
  end
end
get '/' do
  Stream.new
end

最新更新