如何提高libevent bufferevent_write的性能



对于我的测试用例,只发送"00";消息由CCD_ 1发送
情况1:20000个tcp连接,并发送"00";每10秒发送一次,将花费0.15s。
情况2:只有1个tcp连接,并发送"00";每10秒20000次,花费0.015s。

请给我一些改进bufferevent_write性能的建议。

我只是想尽可能快,并且想知道,如果bufferevent_write是异步的,为什么向1个tcp发送20k消息要比向20k tcp发送1mssage快得多。

CPU info:
Architecture:        x86_64
CPU op-mode(s):      32-bit, 64-bit
Byte Order:          Little Endian
CPU(s):              16
On-line CPU(s) list: 0-15
Thread(s) per core:  2
Core(s) per socket:  8
Socket(s):           1
NUMA node(s):        1
Vendor ID:           GenuineIntel
CPU family:          6
Model:               85
Model name:          Intel(R) Xeon(R) Platinum 8269CY CPU @ 2.50GHz
Stepping:            7
CPU MHz:             2500.000
BogoMIPS:            5000.00
Hypervisor vendor:   KVM
Virtualization type: full
L1d cache:           32K
L1i cache:           32K
L2 cache:            1024K
L3 cache:            36608K
NUMA node0 CPU(s):   0-15
Flags:               fpu vme de pse tsc msr pae mce cx8 apic sep mtrr pge mca cmov pat pse36 clflush mmx fxsr sse sse2 ss ht syscall nx pdpe1gb rdtscp lm constant_tsc rep_good nopl cpuid tsc_known_freq pni pclmulqdq monitor ssse3 fma cx16 pcid sse4_1 sse4_2 x2apic movbe popcnt aes xsave avx f16c rdrand hypervisor lahf_lm abm 3dnowprefetch invpcid_single pti fsgsbase tsc_adjust bmi1 hle avx2 smep bmi2 erms invpcid rtm mpx avx512f avx512dq rdseed adx smap avx512cd avx512bw avx512vl xsaveopt xsavec xgetbv1 xsaves arat avx512_vnni
Memory info:
32G

整个测试用例

#include <event2/buffer.h>
#include <event2/bufferevent.h>
#include <event2/event.h>
#include <event2/listener.h>
#include <event2/thread.h>
#include <netinet/tcp.h>
#include <atomic>
#include <cerrno>
#include <csignal>
#include <cstring>
#include <ctime>
#include <deque>
#include <functional>
#include <iostream>
#include <map>
#include <mutex>
#include <set>
#include <thread>
using namespace std::chrono_literals;
static event_base *kEventBase{nullptr};
static evconnlistener *kListener{nullptr};
static std::set<bufferevent *> kSessions{};
static std::mutex kSessionsMutex{};
static std::atomic_bool kRunning{false};
static void stop() {
kRunning = false;
if (kListener != nullptr) {
evconnlistener_disable(kListener);
std::cout << "normal listener stopped" << std::endl;
}
struct timeval local_timeval = {1, 0};
if (kEventBase != nullptr) { event_base_loopexit(kEventBase, &local_timeval); }
}
static void handler(int sig) {
std::cout << "get signal: " << sig << std::endl;
stop();
}
static void ReadCallback(bufferevent *event, void *) {
auto buffer = evbuffer_new();
evbuffer_add_buffer(buffer, bufferevent_get_input(event));
auto data_size = evbuffer_get_length(buffer);
char data[data_size + 1];
bzero(data, data_size + 1);
evbuffer_remove(buffer, data, data_size);
evbuffer_free(buffer);
std::cout << "get data: " << data << std::endl;
}
static void EventCallback(bufferevent *event, short events, void *) {
if (events & BEV_EVENT_EOF) {
std::cout << "socket EOF" << std::endl;
} else if (events & BEV_EVENT_ERROR) {
std::cout << "socket error: " << evutil_socket_error_to_string(EVUTIL_SOCKET_ERROR());
} else if (events & BEV_EVENT_TIMEOUT) {
std::cout << "socket read/write timeout" << std::endl;
} else {
std::cout << "unhandled socket events: " << std::to_string(events) << std::endl;
}
{
std::lock_guard<std::mutex> local_lock_guard{kSessionsMutex};
kSessions.erase(event);
bufferevent_free(event);
}
}
static void listenerCallback(evconnlistener *, evutil_socket_t socket, sockaddr *, int, void *) {
bufferevent *event =
bufferevent_socket_new(kEventBase, socket, BEV_OPT_CLOSE_ON_FREE | BEV_OPT_THREADSAFE);
if (event == nullptr) {
std::cout << "create buffer event failed" << std::endl;
return;
}
int enable = 1;
setsockopt(socket, IPPROTO_TCP, TCP_NODELAY, (void *)&enable, sizeof(enable));
setsockopt(socket, IPPROTO_TCP, TCP_QUICKACK, (void *)&enable, sizeof(enable));
bufferevent_setcb(event, ReadCallback, nullptr, EventCallback, nullptr);
bufferevent_enable(event, EV_WRITE | EV_READ);
kSessions.emplace(event);
}
int main(int argc, const char **argv) {
signal(SIGTERM, handler);
signal(SIGINT, handler);
evthread_use_pthreads();
// init
kEventBase = event_base_new();
if (kEventBase == nullptr) {
std::cout << "cannot create event_base_miner_listener_" << std::endl;
return -1;
}
sockaddr_in local_sin{};
bzero(&local_sin, sizeof(local_sin));
local_sin.sin_family = AF_INET;
local_sin.sin_port = htons(1800u);
local_sin.sin_addr.s_addr = htonl(INADDR_ANY);
kListener = evconnlistener_new_bind(kEventBase,
listenerCallback,
nullptr,
LEV_OPT_REUSEABLE | LEV_OPT_CLOSE_ON_FREE,
-1,
reinterpret_cast<sockaddr *>(&local_sin),
static_cast<int>(sizeof(local_sin)));
if (kListener == nullptr) {
std::cout << "cannot create normal listener" << std::endl;
return -1;
}
kRunning = true;
std::thread thread_send_message([]() {
while (kRunning) {
{
// case 1: If send to 20,000 tcp connection, and send "00" for each, it will cost 0.15s.
std::lock_guard<std::mutex> local_lock_guard{kSessionsMutex};
std::clock_t clock_start = std::clock();
for (auto &it : kSessions) { bufferevent_write(it, "00", 2); }
std::cout << "send message to all done, client count: " << kSessions.size()
<< ", elapsed: " << std::clock() - clock_start << std::endl;
}
{
// case 2: If send to 1 tcp connection, and send "00" 20,000 times, it will cost 0.015s.
//                std::lock_guard<std::mutex> local_lock_guard{kSessionsMutex};
//                for (auto &it : kSessions) {
//                    std::clock_t clock_start = std::clock();
//                    for (int i = 0; i < 20000; ++i) { bufferevent_write(it, "00", 2); }
//                    std::cout << "send message 20k times done, elapsed: " << std::clock() - clock_start
//                              << std::endl;
//                }
}
std::this_thread::sleep_for(10s);
}
});
event_base_dispatch(kEventBase);
if (thread_send_message.joinable()) { thread_send_message.join(); }
{
std::lock_guard<std::mutex> local_lock_guard{kSessionsMutex};
for (auto &it : kSessions) { bufferevent_free(it); }
kSessions.clear();
}
if (kListener != nullptr) {
evconnlistener_free(kListener);
kListener = nullptr;
}
if (kEventBase != nullptr) {
event_base_free(kEventBase);
kEventBase = nullptr;
}
}

可重复性最小的示例

// case 1: 20,000 tcp connections, and send "00" for each every 10s, it will cost 0.15s.
std::clock_t clock_start = std::clock();
for (auto &it : kSessions) { bufferevent_write(it, "00", 2); }
std::cout << "send message to all done, client count: " << kSessions.size()
<< ", elapsed: " << std::clock() - clock_start << std::endl;
// case 2: only 1 tcp connection, and send "00" 20,000 times every 10s, it will cost 0.015s.
for (auto &it : kSessions) {
std::clock_t clock_start = std::clock();
for (int i = 0; i < 20000; ++i) { bufferevent_write(it, "00", 2); }
std::cout << "send message 20k times done, elapsed: " << std::clock() - clock_start
<< std::endl;
}

案例1:

% time     seconds  usecs/call     calls    errors syscall
------ ----------- ----------- --------- --------- ----------------
56.32   29.519892           9   3135415    408444 futex
20.53   10.762191           7   1490532           epoll_ctl
15.25    7.992391          11    715355           writev
3.98    2.086553       45360        46           nanosleep
1.86    0.973074          11     85273         1 epoll_wait
0.62    0.324022           8     39267     19266 accept4
0.58    0.305246           6     48721           read
0.55    0.286858           6     48762           write
0.30    0.154980           4     40004           setsockopt
0.01    0.006486           5      1216           mprotect
0.01    0.002952          21       143           madvise
0.00    0.001018           7       152           brk
0.00    0.000527           6        94           clock_gettime
0.00    0.000023           3         8           openat
0.00    0.000021          21         1           mremap
0.00    0.000010           0        22           mmap
0.00    0.000007           1         9           close
0.00    0.000000           0         8           fstat
0.00    0.000000           0         3           munmap
0.00    0.000000           0         4           rt_sigaction
0.00    0.000000           0         1           rt_sigprocmask
0.00    0.000000           0         1           ioctl
0.00    0.000000           0         1           readv
0.00    0.000000           0         8         8 access
0.00    0.000000           0         1           socket
0.00    0.000000           0         1           bind
0.00    0.000000           0         1           listen
0.00    0.000000           0         1           clone
0.00    0.000000           0         1           execve
0.00    0.000000           0         4           getuid
0.00    0.000000           0         4           getgid
0.00    0.000000           0         4           geteuid
0.00    0.000000           0         4           getegid
0.00    0.000000           0         1           arch_prctl
0.00    0.000000           0         1           set_tid_address
0.00    0.000000           0         2           set_robust_list
0.00    0.000000           0         1           eventfd2
0.00    0.000000           0         1           epoll_create1
0.00    0.000000           0         1           pipe2
0.00    0.000000           0         1           prlimit64
------ ----------- ----------- --------- --------- ----------------
100.00   52.416251               5605075    427719 total

案例2:

% time     seconds  usecs/call     calls    errors syscall
------ ----------- ----------- --------- --------- ----------------
normal listener stopped
66.66    0.151105           7     22506      3469 futex
9.74    0.022084           6      3709         1 epoll_wait
9.54    0.021624           4      5105           epoll_ctl
9.47    0.021466           8      2550           writev
2.47    0.005598           4      1263           write
1.70    0.003857           3      1246           read
0.18    0.000409          18        23           nanosleep
0.09    0.000197           4        46           clock_gettime
0.03    0.000068           4        16           mprotect
0.02    0.000035           2        21           mmap
0.01    0.000024           8         3           munmap
0.01    0.000019          10         2         1 accept4
0.01    0.000018           5         4           setsockopt
0.01    0.000015           8         2           set_robust_list
0.01    0.000014           4         4           rt_sigaction
0.01    0.000014           4         4           geteuid
0.01    0.000013           3         4           getgid
0.01    0.000012           3         4           getuid
0.01    0.000012           3         4           getegid
0.00    0.000011           1         8           fstat
0.00    0.000010          10         1           socket
0.00    0.000008           8         1           clone
0.00    0.000007           2         3           brk
0.00    0.000007           7         1           pipe2
0.00    0.000006           1         7           openat
0.00    0.000006           6         1           epoll_create1
0.00    0.000005           1         8         8 access
0.00    0.000005           5         1           bind
0.00    0.000005           5         1           eventfd2
0.00    0.000005           5         1           prlimit64
0.00    0.000004           1         7           close
0.00    0.000004           4         1           listen
0.00    0.000003           3         1           rt_sigprocmask
0.00    0.000003           3         1           arch_prctl
0.00    0.000003           3         1           set_tid_address
0.00    0.000000           0         1           execve
------ ----------- ----------- --------- --------- ----------------
100.00    0.226676                 36561      3479 total

如何提高libevent bufferevent_write性能

阅读libevent的文档,研究其源代码,并考虑其他事件循环库,如libev、Qt、Wt、lib洋葱、POCO等…

请注意以下几点。我假设是一个现代的Linux/x86-64系统

  • 您可以评测您的开源事件循环库(例如,通过使用最新的GCC从源代码编译它,并使用-pg -O2标志,然后使用strace(1(和/或gprof(1(、/或perf(1和/或time(1((以及top(1(,ps(1(;proc(5(;netstat(8(;ip(8(,ifconfig(8(、tcpdump(8(和xosview来观察整个Linux系统(。当然,读取时间(7(和epoll(7(以及轮询(2(

  • TCP/IP引入了一些开销,IP路由增加了更多的开销,典型的以太网数据包至少有数百个字节(以及数十个字节的开销(。您当然希望同时发送(2(或recv(2(几百个字节发送短的"00"消息(大约四个字节的有用有效载荷(效率低下。确保您的应用程序一次发送数百字节的消息。您可能会考虑一些JSONRPC方法(当然,在更高级别上设计您的协议,使用更少但更大的消息触发每个更复杂的行为(或一些MPI方法。发送更少但更高级消息的一种方法是嵌入一些解释器,如Guile或Lua,并发送更高级的脚本块或请求(就像过去的NeWS,现在的PostgreSQL或exim(

  • 对于短距离和小型通信,更喜欢在同一台计算机上运行几个进程或线程,并使用mqoverview(7(、pipe(7(,fifo(7(和unix(7(来避免以太网。

  • 大多数计算机都是2020年的多核计算机,请小心,您可以使用Pthreads或std::thread(每个核心上运行一个线程,因此笔记本电脑上至少有2到4个不同的线程,或者强大的Linux服务器上有100个线程(。您需要一些同步代码(例如std::mutexstd::lock_guard或Pẗ线程互斥….(

  • 意识到C10K问题,现有的开源服务器程序或库中获得灵感,如lighttpd、Wt、FLTK、REDIS、Vmime、libcurl、libonion(并研究其源代码,并使用gdb(1(和/或strace(1(或ltrace(1(观察其运行时行为(

  • 网络可能是瓶颈(然后您将无法改进代码以获得性能;您需要对软件体系结构进行一些更改(。阅读有关云计算、分布式计算、XDR、ASN.1、SOAP、REST、Web服务、libssh、π演算的更多信息

注意:

static void handler(int sig) {
std::cout << "get signal: " << sig << std::endl;
stop();
}

如果与signal(7(一起使用,则违反了信号安全规则(7(,因此您可能会使用管道(7(来自行处理Qt建议的技巧,或者考虑使用Linux特定的signalfd(2(系统调用。

另请阅读高级Linux编程,然后是syscalls(2(、socket(7(和tcp(7(。

相关内容

  • 没有找到相关文章

最新更新