C - LIBUV 分配的内存缓冲区重用技术



我正在将 libuv 用于我广泛的网络交互应用程序,我担心哪些重用分配内存的技术会同时高效且安全,libuv 回调延迟执行。

在非常基本的层,暴露给libuv用户,需要指定缓冲区分配回调以及设置句柄读取器:

UV_EXTERN int uv_read_start(uv_stream_t*, uv_alloc_cb alloc_cb, uv_read_cb read_cb);

uv_alloc_cb在哪里

typedef void (*uv_alloc_cb)(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf);

但问题是:每次通过句柄传入新消息时都会调用此内存分配回调(例如,接收来自句柄的每个 UDP 数据报uv_udp_t并且为每个传入的 UDP 数据报向前分配新缓冲区似乎非常非内存方面。

所以我要求一种常见的 C 技术(可能在 libuv 回调系统引入的延迟执行上下文中(,尽可能重用相同的分配内存。

另外,如果可能的话,我想保持Windows便携性。

笔记:

  • 我知道这个问题:libuv 是否提供任何将缓冲区附加到连接并重新使用它的设施; 它被接受的答案除了说明静态分配缓冲区是不行的事实之外,并没有回答如何使用 libuv 实际正确进行内存分配。特别是,它没有涵盖附加到句柄的缓冲区的安全性(在同一缓冲区上使用延迟写入回调,这可能与 libuv 主循环的多次迭代中的另一个读取回调调用重叠(方面(通过包装结构或句柄>数据上下文(。
  • 阅读 http://nikhilm.github.io/uvbook/filesystem.html,我注意到截图uvtee/main.c - Write to pipe下的以下短语:

    我们制作一个副本,以便我们可以将两个缓冲区从两个调用中释放出来,彼此独立地write_data。虽然对于这样的演示程序来说是可以接受的,但您可能希望更智能的内存管理,例如引用计数缓冲区或任何主要应用程序中的缓冲区池。

    但是我找不到任何涉及 libuv 缓冲区引用计数的解决方案(如何正确执行此操作?(或 libuv 环境中缓冲区池的显式示例(是否有任何库?

我想

分享我自己解决这个问题的经验。我能感受到你的痛苦和困惑,但实际上,如果你知道自己在做什么,考虑到你拥有的大量选择,实施一个可行的解决方案并不太困难。

目的

  1. 实现能够执行两个操作的缓冲区池 - 获取释放

  2. 基本池化策略:

    • 获取从池中抽取缓冲区,有效地将可用缓冲区数量减少 1;
    • 如果没有可用的缓冲区,则会出现两个选项:
      • 增大池并返回新创建的缓冲区;或
      • 创建并返回虚拟缓冲区(如下所述(。
    • 释放将缓冲区返回到池。
  3. 池可以是固定的,也可以是可变大小的。"变量">意味着最初有 M 个预分配缓冲区(例如零(,并且池可以按需增长到 N."固定">意味着所有缓冲区在池创建时都预先分配 (M = N(。

  4. 实现一个为 libuv 获取缓冲区的回调。

  5. 不允许无限池增长在任何情况下仍使池正常运行,内存不足的情况除外。

实现

现在,让我们更详细地了解所有这些。

池结构:

#define BUFPOOL_CAPACITY 100
typedef struct bufpool_s bufpool_t;
struct bufpool_s {
    void *bufs[BUFPOOL_CAPACITY];
    int size;
};

size是当前池大小。

缓冲区本身是一个以以下结构为前缀的内存块:

#define bufbase(ptr) ((bufbase_t *)((char *)(ptr) - sizeof(bufbase_t)))
#define buflen(ptr) (bufbase(ptr)->len)
typedef struct bufbase_s bufbase_t;
struct bufbase_s {
    bufpool_t *pool;
    int len;
};

len是缓冲区的长度(以字节为单位(。

新缓冲区的分配如下所示:

void *bufpool_alloc(bufpool_t *pool, int len) {
    bufbase_t *base = malloc(sizeof(bufbase_t) + len);
    if (!base) return 0;
    base->pool = pool;
    base->len = len;
    return (char *)base + sizeof(bufbase_t);
}

请注意,返回的指针指向标头之后的下一个字节 - 数据区域。这允许使用缓冲区指针,就好像它们是通过对malloc的标准调用分配的。

释放则相反:

void bufpool_free(void *ptr) {
    if (!ptr) return;
    free(bufbase(ptr));
}

libuv 的分配回调如下所示:

void alloc_cb(uv_handle_t *handle, size_t size, uv_buf_t *buf) {
    int len;
    void *ptr = bufpool_acquire(handle->loop->data, &len);
    *buf = uv_buf_init(ptr, len);
}

您可以在此处看到,alloc_cb从循环中的用户数据指针获取缓冲池的指针。这意味着缓冲池应在使用前附加到事件循环。换句话说,您应该在创建循环时初始化池,并将其指针分配给data字段。如果您已经在该字段中保存其他用户数据,只需扩展您的结构即可。

虚拟缓冲区是一个假缓冲区,这意味着它不是源自池,但仍然功能齐全。虚拟缓冲区的目的是在池饥饿的罕见情况下保持整个工作,即当所有缓冲区都被获取并且需要另一个缓冲区时。根据我的研究,在所有现代操作系统上,大约 8Kb 的小内存块的分配都非常快 - 这非常适合虚拟缓冲区的大小。

#define DUMMY_BUF_SIZE 8000
void *bufpool_dummy() {
    return bufpool_alloc(0, DUMMY_BUF_SIZE);
}

获取操作:

void *bufpool_acquire(bufpool_t *pool, int *len) {
    void *buf = bufpool_dequeue(pool);
    if (!buf) buf = bufpool_dummy();
    *len = buf ? buflen(buf) : 0;
    return buf;
}

发布操作:

void bufpool_release(void *ptr) {
    bufbase_t *base;
    if (!ptr) return;
    base = bufbase(ptr);
    if (base->pool) bufpool_enqueue(base->pool, ptr);
    else free(base);
}

这里有两个函数 - bufpool_enqueuebufpool_dequeue .基本上,他们执行游泳池的所有工作。

就我而言,在上述之上有一个 O(1( 缓冲区索引队列,它允许我更有效地跟踪池的状态,非常快速地获取缓冲区索引。没有必要像我那样极端,因为池的最大大小是有限的,因此任何数组搜索在时间上也是恒定的。

在最简单的情况下,您可以将这些函数实现为纯线性搜索器,bufpool_s结构中的整个bufs数组。例如,如果获取了缓冲区,则搜索第一个非 NULL 点,保存指针并将 NULL 放在该点。下次释放缓冲区时,搜索第一个 NULL 点并将其指针保存在其中。

池内部如下:

#define BUF_SIZE 64000
void *bufpool_grow(bufpool_t *pool) {
    int idx = pool->size;
    void *buf;
    if (idx == BUFPOOL_CAPACITY) return 0;
    buf = bufpool_alloc(pool, BUF_SIZE);
    if (!buf) return 0;
    pool->bufs[idx] = 0;
    pool->size = idx + 1;
    return buf;
}
void bufpool_enqueue(bufpool_t *pool, void *ptr) {
    int idx;
    for (idx = 0; idx < pool->size; ++idx) {
        if (!pool->bufs[idx]) break;
    }
    assert(idx < pool->size);
    pool->bufs[idx] = ptr;
}
void *bufpool_dequeue(bufpool_t *pool) {
    int idx;
    void *ptr;
    for (idx = 0; idx < pool->size; ++idx) {
        ptr = pool->bufs[idx];
        if (ptr) {
            pool->bufs[idx] = 0;
            return ptr;
        }
    }
    return bufpool_grow(pool);
}

正常的缓冲区大小为 64000 字节,因为我希望它舒适地适合带有标头的 64Kb 块。

最后,初始化和取消初始化例程:

void bufpool_init(bufpool_t *pool) {
    pool->size = 0;
}
void bufpool_done(bufpool_t *pool) {
    int idx;
    for (idx = 0; idx < pool->size; ++idx) bufpool_free(pool->bufs[idx]);
}

请注意,出于说明目的,此实现已简化。这里没有池收缩策略,而在现实世界中,很可能是必需的。

用法

您现在应该能够编写 libuv 回调:

void read_cb(uv_stream_t *stream, ssize_t nread, const uv_buf_t *buf) {
    /* ... */
    bufpool_release(buf->base); /* Release the buffer */
}

循环初始化:

uv_loop_t *loop = malloc(sizeof(*loop));
bufpool_t *pool = malloc(sizeof(*pool));
uv_loop_init(loop);
bufpool_init(pool);
loop->data = pool;

操作:

uv_tcp_t *tcp = malloc(sizeof(*tcp));
uv_tcp_init(tcp);
/* ... */
uv_read_start((uv_handle_t *)tcp, alloc_cb, read_cb);

更新 (02 Aug 2016(

根据请求的大小获取缓冲区时使用自适应策略也是一个好主意,并且仅在请求大量数据(例如,所有读取和长时间写入(时才返回池化缓冲区。对于其他情况(例如大多数写入(,返回虚拟缓冲区。这将有助于避免浪费池化缓冲区,同时保持可接受的分配速度。例如:

void alloc_cb(uv_handle_t *handle, size_t size, uv_buf_t *buf) {
    int len = size; /* Requested buffer size */
    void *ptr = bufpool_acquire(handle->loop->data, &len);
    *buf = uv_buf_init(ptr, len);
}
void *bufpool_acquire(bufpool_t *pool, int *len) {
    int size = *len;
    if (size > DUMMY_BUF_SIZE) {
        buf = bufpool_dequeue(pool);
        if (buf) {
            if (size > BUF_SIZE) *len = BUF_SIZE;
            return buf;
        }
        size = DUMMY_BUF_SIZE;
    }
    buf = bufpool_alloc(0, size);
    *len = buf ? size : 0;
    return buf;
}

附言不需要buflenbufpool_dummy这个片段。

如果你在Linux上,你很幸运。Linux内核通常默认使用所谓的SLAB分配器。此分配器的优点是,它通过维护可回收块池来减少实际的内存分配。这对你来说意味着,只要你总是分配相同大小的缓冲区(理想情况下是 pow2 大小的 PAGE_SIZE(,你就可以在 Linux 上使用malloc()

如果您不是在 Linux(或 FreeBSD 或 Solaris(上,或者如果您开发了一个跨平台的应用程序,您可以考虑使用 glib 及其Memory Slices,它们是 SLAB 分配器的跨平台实现。它在支持它的平台上使用本机实现,因此在 Linux 上使用它不会带来任何优势(我自己运行了一些测试(。我相信还有其他库可以做同样的事情,或者你可以自己实现它。

让我们重复回调的函数签名:

void alloc_cb(uv_handle_t* handle, size_t, uv_buf_t*);

我将handle->data设置为指向结构/对/元组,例如:

auto t(std::make_tuple(blah1, blah2, blah3));

这允许我与 cb 共享任意数据。我所做的是将结构/对/元组数据成员之一设置为我的缓冲区:

char data[65536];

然后我只使用 cb 中的缓冲区:

extern "C"
inline void uv_alloc_cb(uv_handle_t* const uvh, std::size_t const sz,
  uv_buf_t* const buf) noexcept
{
  auto const p(static_cast<std::pair<void*, char*>*>(uvh->data));
  buf->base = std::get<1>(*p);
  buf->len = 65536;
}

这是超快的,不需要动态分配。我认为libuv API 是临时的,根本没有经过深思熟虑,并且缺乏实现。为什么需要这种任意的 64k 缓冲区?如果我不提供 64k,libuv一点也不高兴,尽管它不会崩溃。

最新更新