重定向Lwt通道



我有一个在Unix套接字上运行的ssh服务,我有一个本地TCP服务器,我想让它指向Unix套接字的通道。

基本上当我这样做的时候:

$ ssh root@localhost -p 2000

然后,我的本地TCP服务器获取请求并将其传输到Unix套接字,TCP客户端(在本例中为ssh)从Unix套接字获得应答。相关代码:

let running_tunnel debug (tcp_ic, tcp_oc) () =
Lwt_io.with_connection a_unix_addr begin fun (mux_ic, mux_oc) ->
let%lwt _ = some_call with_an_arg
and _ =
(* Some setup code *)

let rec forever () =
Lwt_io.read_line tcp_ic >>= fun opening_message ->
Lwt_io.write_from_string_exactly
mux_oc opening_message 0 (String.length opening_message) >>= fun () ->
Lwt_io.read_line mux_ic >>= fun reply ->
Lwt_io.printl reply >>= fun () ->
Lwt_io.write_line tcp_oc reply >>= fun () ->
forever ()
in
forever ()
in
Lwt.return_unit
end

这是可行的。当我在命令行上调用ssh时,它会"卡住",但我知道我正在通过一些数据,因为另一方的ssh头是正确的,SSH-2.0-OpenSSH_6.7。我还让我的端打印出初始ssh握手的更多部分,例如,我看到这样打印:

??^?W񦛦zJ?~??curve25519-sha256@libssh.org,ecdh-sha2-nistp256,ecdh-sha2-nistp384,ecdh-sha2-nistp521,diffie-hellman-group-exchange-sha256,diffie-hellman-group14-sha1ssh-rsa,ssh-dss>aes128-ctr,aes192-ctr,aes256-ctr,chacha20-poly1305@openssh.com>aes128-ctr,aes192-ctr,aes256-ctr,chacha20-poly1305@openssh.com?umac-64-etm@openssh.com,umac-128-etm@openssh.com,hmac-sha2-256-etm@openssh.com,hmac-sha2-512-etm@openssh.com,hmac-sha1-etm@openssh.com,umac-64@openssh.com,umac-128@openssh.com,hmac-sha2-256,hmac-sha2-512,hmac-sha1?umac-64-etm@openssh.com,umac-128-etm@openssh.com,hmac-sha2-256-etm@openssh.com,hmac-sha2-512-etm@openssh.com,hmac-sha1-etm@openssh.com,umac-64@openssh.com,umac-128@openssh.com,hmac-sha2-256,hmac-sha2-512,hmac-sha1none,zlib@openssh.comnone,zlib@openssh.co 

等,这似乎是正确的。我认为挂起的原因是因为我使用的是Lwt_io.read_line,所以我试着这样做:

let rec forever () =
Lwt_io.read tcp_ic >>= fun opening_message ->
Lwt_io.write_from_string_exactly
mux_oc opening_message 0 (String.length opening_message) >>= fun () ->
Lwt_io.read mux_ic >>= fun reply ->
Lwt_io.printl reply >>= fun () ->
Lwt_io.write tcp_oc reply >>= fun () ->
forever ()
in
forever ()

实际上更糟糕,它甚至没有打印出最初的握手。我也试过专用的{write,read}_into…成功有限的功能。在strace/d休战下运行,我看到的最终结果如下:

read(0x6, "SSH-2.0-OpenSSH_6.9rn", 0x1000)       = 21 0
write(0x1, "SSH-2.0-OpenSSH_6.9n", 0x14)      = 20 0
read(0x7, "", 0x1000)      = -1 Err#35
write(0x7, "SSH-2.0-OpenSSH_6.9", 0x13)        = 19 0
select(0x9, 0x7FFF5484F880, 0x7FFF5484F800, 0x7FFF5484F780, 0x0)         = 1 0
read(0x7, "SSH-2.0-OpenSSH_6.7rn", 0x1000)       = 21 0
write(0x1, "SSH-2.0-OpenSSH_6.7n", 0x14)      = 20 0
read(0x6, "", 0x1000)      = -1 Err#35
write(0x6, "SSH-2.0-OpenSSH_6.7n", 0x14)      = 20 0
select(0x9, 0x7FFF5484F880, 0x7FFF5484F800, 0x7FFF5484F780, 0x0)         = 1 0
read(0x6, "", 0x1000)      = 1968 0
read(0x6, "", 0x1000)      = -1 Err#35
^C

其中6.9是我的本地机器的ssh, 6.7是Unix套接字后面的远程机器。有一件事对我来说似乎很奇怪,那就是r是如何被删除的,这将读/写计数更改为1。我不确定这是否是关键的区别。

理想情况下,我希望从Lwt中获得某种抽象,即无论何时在这个可读通道(TCP套接字)上有可用的数据,都将其直接写入可写通道(Unix套接字),反之亦然。

readline的变体不起作用,因为数据流是二进制的,而readline用于基于文本的行输入。Lwt_io.read函数的第二个变体不起作用,因为该函数将读取所有输入,直到最后,除非您指定了可选的count参数。这意味着,只有在读取器端的EOF之后,控制才会传递给write。使用Lwt_io.read和一些计数,例如,Lwt_io.read ~count:1024 mux_ic将不是一个非常坏的主意。此外,如果您希望流是有限的,则不应忘记检查返回值。read_into应该小心使用,因为与read函数不同,它不能保证它将读取您请求的确切数据量。换句话说,将会有简短的阅读。对于write_into函数也是如此。这个函数的_exactly版本没有这个问题,所以最好使用它们。

还有一件事,你应该考虑。Lwt_io提供了一个缓冲输入输出接口。这意味着,该模块中的所有函数都在向某个内部缓冲区写入和读取,而不是通过设备描述符直接与操作系统交互。这意味着,当您将数据从一个缓冲源传输到另一个缓冲源时,您将在两端出现一些意想不到的延迟。所以你应该预料到他们会使用冲水。否则,当你有双向交互时,它们可能会引入竞争条件。

此外,尽管缓冲的io大大简化了事情,但它是有代价的。事实上,您有几个不必要的缓冲区层,当您使用Lwt_io时,您还分配了许多不必要的数据,将内存与垃圾一起浪费。问题是Lwt_io有自己的内部缓冲区,它不会为普通用户显示,并且所有返回数据或使用数据的函数都需要执行额外的内部函数或内部函数的复制操作。例如,使用Lwt_io.{read,write},将执行以下操作:

  1. 从内核拷贝数据到内部缓冲区
  2. 分配字符串
  3. 将数据从内部缓冲区复制到分配的字符串
  4. (现在是write部分)将数据从字符串复制到内部缓冲区
  5. 从内部缓冲区复制数据到内核。
  6. (在GC内部的某个地方,有时稍后)复制分配的字符串从小堆到大堆(如果字符串足够小,可以放入小堆),或者将它从一个位置复制到另一个位置(如果压缩算法决定移动它,并且字符串仍然存在,如果生产者超过了消费者和读取数据的生命周期,这是很有可能的

看起来我们可以去掉2、3、4和6中的副本。我们可以使用我们自己的缓冲区,将数据从内核复制到它,然后将数据从这个内核复制回内核。我们甚至可以通过使用splice和tee系统调用,在内核缓冲区之间直接复制数据,而不涉及用户空间,从而消除1和5中的副本。但在这种情况下,我们将失去检查数据的能力,而这通常是我们想要的。

那么,让我们尝试从内核空间中删除除了副本之外的所有副本。我们可以在Lwt_io中使用一个低级接口来访问内部缓冲区,比如direct_access和新添加的block函数,但这需要了解Lwt_io的内部,这不是很简单,但仍然是可行的。相反,我们将使用一个更简单的方法,即使用Lwt_unix库。这个库直接与内核交互,没有任何中间缓冲区,让我们自己缓冲。

open Lwt.Infix
let bufsiz = 32768
let echo ic oc =
let buf = Lwt_bytes.create bufsiz in
let rec loop p =
let p = p mod bufsiz in
Lwt_bytes.read ic buf p (bufsiz - p) >>= function
| 0 -> Lwt.return ()
| n -> Lwt_bytes.write oc buf p n >>= loop in
loop 0

这将实现一个简单而快速的数据复制,这将复制数据与cat程序相同的速度。不过,仍有一些改进的余地。例如,为了鲁棒性(特别是EINTR信号),应该添加错误处理。另外,这个函数实现同步复制,其中输入和输出是紧密锁定的。有时这不是一个选择。考虑以下示例,输入是一个UDP套接字,它可能很容易超过消费者,并且数据将被丢弃,即使平均而言生产者比消费者慢。要处理这个问题,您需要将读取器和写入器拆分为两个单独的线程,它们通过一些弹性队列进行通信。

Lwt是一个非常低级的库,它不能也不应该为你解决这些问题。它提供了一些机制,可用于为每种特定情况构建解决方案。有些库确实为一些常见模式提供了解决方案,mq和纳米消息就是很好的例子。

<标题>更新我可能是太低级的家伙,也可能是我挖得太深。如果您确实在寻找一种高级方法,那么您应该使用Lwt_stream,在这种情况下,您可以将foo.pipe(bar).pipe(foo)的节点等效编码为
let echo ic oc = Lwt_io.(write_chars oc (read_chars ic))

当然,这会慢得多,但这取决于您的任务。

是的,要执行双向重定向,您应该只运行两个线程,像这样:echo ic oc <&> echo ic oc对于具有文件描述符的版本,这两个线程都是可写的。如果您使用的是Lwt_io通道,它像管道一样是单向的,那么您将为每个部分获得两个端点。我们将它们分别命名为fifo,分别代表前端输入和输出,bibo代表后端。然后,您需要像这样连接它:echo fo bi <&> echo bo fi,使用第二个版本的echo与流。

性能成本

通常,高级抽象是以性能为代价的。在我们的特殊情况下,使用第一个版本的echo,其吞吐量超过每秒1Gb。具有流的版本的平均吞吐量为5MB/s。根据您的设置,它可能工作,也可能不工作。对于常规的ssh会话来说,这已经足够了,但是可能会对本地网络中的scp产生影响。

最新更新