>我正在寻找一个 shell 命令 X,例如,当我执行时:
command_a | X 5000 | command_b
command_a
的stdout
在 5 秒后以command_b
stdin
(至少)写入。
一种延迟缓冲器。
据我所知,buffer
/mbuffer
可以以恒定速率(每秒固定的字节数)写入。相反,我想要一个恒定的时间延迟(t=0 是X
读取command_a
输出块时,在 t=5000 时,它必须将此块写入command_b
)。
[编辑]我已经实现了它:https://github.com/rom1v/delay
我知道你说你正在寻找一个shell命令,但是使用子shell来发挥你的优势呢?像这样:
command_a | (sleep 5; command_b)
所以要grep
一个文件cat
-ed 通过(我知道,我知道,cat
用得不好,但只是一个例子):
cat filename | (sleep 5; grep pattern)
一个更完整的示例:
$ cat testfile
The
quick
brown
fox
$ cat testfile | (sleep 5; grep brown)
# A 5-second sleep occurs here
brown
甚至,正如Michale Kropat所建议的那样,带有sleep
的组命令也可以工作(并且可以说更正确)。这样:
$ cat testfile | { sleep 5; grep brown; }
注意:不要忘记命令后的分号(这里是grep brown
),因为这是必要的!
由于似乎不存在这样的命令 dit,我用 C 实现了它: https://github.com/rom1v/delay
delay [-b <dtbufsize>] <delay>
像这样的东西?
#!/bin/bash
while :
do
read line
sleep 5
echo $line
done
将文件另存为"慢男孩",然后执行
chmod +x slowboy
并运行为
command_a | ./slowboy | command_b
这可能有效
time_buffered () {
delay=$1
while read line; do
printf "%d %sn" "$(date +%s)" "$line"
done | while read ts line; do
now=$(date +%s)
if (( now - ts < delay)); then
sleep $(( now - ts ))
fi
printf "%sn" "$line"
done
}
commandA | time_buffered 5 | commandB
第一个循环用时间戳标记其输入的每一行,并立即将其馈送到第二个循环。第二个循环检查每行的时间戳,并在必要时休眠,直到首次读取后$delay
秒,然后再输出该行。
你的问题引起了我的兴趣,我决定回来玩它。这是 Perl 中的基本实现。它可能不是可移植的(ioctl
),仅在Linux上测试。
基本思想是:
- 每 X 微秒读取一次可用输入
- 将每个输入块存储在哈希中,并将当前时间戳作为键
- 同时在队列(数组)上推送当前时间戳
- 在队列中查找最旧的时间戳,如果延迟足够长的时间,则写入 + 丢弃哈希中的数据
- 重复
最大缓冲区大小
存储的数据有最大大小。如果达到,则在写入后有可用空间之前不会读取其他数据。
性能
它可能不够快,无法满足您的要求(几 Mb/s)。我的最大吞吐量是 639 Kb/s,见下文。
测试
# Measure max throughput:
$ pv < /dev/zero | ./buffer_delay.pl > /dev/null
# Interactive manual test, use two terminal windows:
$ mkfifo data_fifo
terminal-one $ cat > data_fifo
terminal-two $ ./buffer_delay.pl < data_fifo
# now type in terminal-one and see it appear delayed in terminal-two.
# It will be line-buffered because of the terminals, not a limitation
# of buffer_delay.pl
buffer_delay.pl
#!/usr/bin/perl
use strict;
use warnings;
use IO::Select;
use Time::HiRes qw(gettimeofday usleep);
require 'sys/ioctl.ph';
$|++;
my $delay_usec = 3 * 1000000; # (3s) delay in microseconds
my $buffer_size_max = 10 * 1024 * 1024 ; # (10 Mb) max bytes our buffer is allowed to contain.
# When buffer is full, incoming data will not be read
# until space becomes available after writing
my $read_frequency = 10; # Approximate read frequency in Hz (will not be exact)
my %buffer; # the data we are delaying, saved in chunks by timestamp
my @timestamps; # keys to %buffer, used as a queue
my $buffer_size = 0; # num bytes currently in %buffer, compare to $buffer_size_max
my $time_slice = 1000000 / $read_frequency; # microseconds, min time for each discrete read-step
my $sel = IO::Select->new([*STDIN]);
my $overflow_unread = 0; # Num bytes waiting when $buffer_size_max is reached
while (1) {
my $now = sprintf "%d%06d", gettimeofday; # timestamp, used to label incoming chunks
# input available?
if ($overflow_unread || $sel->can_read($time_slice / 1000000)) {
# how much?
my $available_bytes;
if ($overflow_unread) {
$available_bytes = $overflow_unread;
}
else {
$available_bytes = pack("L", 0);
ioctl (STDIN, FIONREAD(), $available_bytes);
$available_bytes = unpack("L", $available_bytes);
}
# will it fit?
my $remaining_space = $buffer_size_max - $buffer_size;
my $try_to_read_bytes = $available_bytes;
if ($try_to_read_bytes > $remaining_space) {
$try_to_read_bytes = $remaining_space;
}
# read input
if ($try_to_read_bytes > 0) {
my $input_data;
my $num_read = read (STDIN, $input_data, $try_to_read_bytes);
die "read error: $!" unless defined $num_read;
exit if $num_read == 0; # EOF
$buffer{$now} = $input_data; # save input
push @timestamps, $now; # save the timestamp
$buffer_size += length $input_data;
if ($overflow_unread) {
$overflow_unread -= length $input_data;
}
elsif (length $input_data < $available_bytes) {
$overflow_unread = $available_bytes - length $input_data;
}
}
}
# write + delete any data old enough
my $then = $now - $delay_usec; # when data is old enough
while (scalar @timestamps && $timestamps[0] < $then) {
my $ts = shift @timestamps;
print $buffer{$ts} if defined $buffer{$ts};
$buffer_size -= length $buffer{$ts};
die "Serious problemn" unless $buffer_size >= 0;
delete $buffer{$ts};
}
# usleep any remaining time up to $time_slice
my $time_left = (sprintf "%d%06d", gettimeofday) - $now;
usleep ($time_slice - $time_left) if $time_slice > $time_left;
}
欢迎在下面发表意见和建议!