对于我的测试用例,只发送"00";消息由CCD_ 1发送
情况1:20000个tcp连接,并发送"00";每10秒发送一次,将花费0.15s。
情况2:只有1个tcp连接,并发送"00";每10秒20000次,花费0.015s。
请给我一些改进bufferevent_write
性能的建议。
我只是想尽可能快,并且想知道,如果bufferevent_write是异步的,为什么向1个tcp发送20k消息要比向20k tcp发送1mssage快得多。
CPU info:
Architecture: x86_64
CPU op-mode(s): 32-bit, 64-bit
Byte Order: Little Endian
CPU(s): 16
On-line CPU(s) list: 0-15
Thread(s) per core: 2
Core(s) per socket: 8
Socket(s): 1
NUMA node(s): 1
Vendor ID: GenuineIntel
CPU family: 6
Model: 85
Model name: Intel(R) Xeon(R) Platinum 8269CY CPU @ 2.50GHz
Stepping: 7
CPU MHz: 2500.000
BogoMIPS: 5000.00
Hypervisor vendor: KVM
Virtualization type: full
L1d cache: 32K
L1i cache: 32K
L2 cache: 1024K
L3 cache: 36608K
NUMA node0 CPU(s): 0-15
Flags: fpu vme de pse tsc msr pae mce cx8 apic sep mtrr pge mca cmov pat pse36 clflush mmx fxsr sse sse2 ss ht syscall nx pdpe1gb rdtscp lm constant_tsc rep_good nopl cpuid tsc_known_freq pni pclmulqdq monitor ssse3 fma cx16 pcid sse4_1 sse4_2 x2apic movbe popcnt aes xsave avx f16c rdrand hypervisor lahf_lm abm 3dnowprefetch invpcid_single pti fsgsbase tsc_adjust bmi1 hle avx2 smep bmi2 erms invpcid rtm mpx avx512f avx512dq rdseed adx smap avx512cd avx512bw avx512vl xsaveopt xsavec xgetbv1 xsaves arat avx512_vnni
Memory info:
32G
整个测试用例
#include <event2/buffer.h>
#include <event2/bufferevent.h>
#include <event2/event.h>
#include <event2/listener.h>
#include <event2/thread.h>
#include <netinet/tcp.h>
#include <atomic>
#include <cerrno>
#include <csignal>
#include <cstring>
#include <ctime>
#include <deque>
#include <functional>
#include <iostream>
#include <map>
#include <mutex>
#include <set>
#include <thread>
using namespace std::chrono_literals;
static event_base *kEventBase{nullptr};
static evconnlistener *kListener{nullptr};
static std::set<bufferevent *> kSessions{};
static std::mutex kSessionsMutex{};
static std::atomic_bool kRunning{false};
static void stop() {
kRunning = false;
if (kListener != nullptr) {
evconnlistener_disable(kListener);
std::cout << "normal listener stopped" << std::endl;
}
struct timeval local_timeval = {1, 0};
if (kEventBase != nullptr) { event_base_loopexit(kEventBase, &local_timeval); }
}
static void handler(int sig) {
std::cout << "get signal: " << sig << std::endl;
stop();
}
static void ReadCallback(bufferevent *event, void *) {
auto buffer = evbuffer_new();
evbuffer_add_buffer(buffer, bufferevent_get_input(event));
auto data_size = evbuffer_get_length(buffer);
char data[data_size + 1];
bzero(data, data_size + 1);
evbuffer_remove(buffer, data, data_size);
evbuffer_free(buffer);
std::cout << "get data: " << data << std::endl;
}
static void EventCallback(bufferevent *event, short events, void *) {
if (events & BEV_EVENT_EOF) {
std::cout << "socket EOF" << std::endl;
} else if (events & BEV_EVENT_ERROR) {
std::cout << "socket error: " << evutil_socket_error_to_string(EVUTIL_SOCKET_ERROR());
} else if (events & BEV_EVENT_TIMEOUT) {
std::cout << "socket read/write timeout" << std::endl;
} else {
std::cout << "unhandled socket events: " << std::to_string(events) << std::endl;
}
{
std::lock_guard<std::mutex> local_lock_guard{kSessionsMutex};
kSessions.erase(event);
bufferevent_free(event);
}
}
static void listenerCallback(evconnlistener *, evutil_socket_t socket, sockaddr *, int, void *) {
bufferevent *event =
bufferevent_socket_new(kEventBase, socket, BEV_OPT_CLOSE_ON_FREE | BEV_OPT_THREADSAFE);
if (event == nullptr) {
std::cout << "create buffer event failed" << std::endl;
return;
}
int enable = 1;
setsockopt(socket, IPPROTO_TCP, TCP_NODELAY, (void *)&enable, sizeof(enable));
setsockopt(socket, IPPROTO_TCP, TCP_QUICKACK, (void *)&enable, sizeof(enable));
bufferevent_setcb(event, ReadCallback, nullptr, EventCallback, nullptr);
bufferevent_enable(event, EV_WRITE | EV_READ);
kSessions.emplace(event);
}
int main(int argc, const char **argv) {
signal(SIGTERM, handler);
signal(SIGINT, handler);
evthread_use_pthreads();
// init
kEventBase = event_base_new();
if (kEventBase == nullptr) {
std::cout << "cannot create event_base_miner_listener_" << std::endl;
return -1;
}
sockaddr_in local_sin{};
bzero(&local_sin, sizeof(local_sin));
local_sin.sin_family = AF_INET;
local_sin.sin_port = htons(1800u);
local_sin.sin_addr.s_addr = htonl(INADDR_ANY);
kListener = evconnlistener_new_bind(kEventBase,
listenerCallback,
nullptr,
LEV_OPT_REUSEABLE | LEV_OPT_CLOSE_ON_FREE,
-1,
reinterpret_cast<sockaddr *>(&local_sin),
static_cast<int>(sizeof(local_sin)));
if (kListener == nullptr) {
std::cout << "cannot create normal listener" << std::endl;
return -1;
}
kRunning = true;
std::thread thread_send_message([]() {
while (kRunning) {
{
// case 1: If send to 20,000 tcp connection, and send "00" for each, it will cost 0.15s.
std::lock_guard<std::mutex> local_lock_guard{kSessionsMutex};
std::clock_t clock_start = std::clock();
for (auto &it : kSessions) { bufferevent_write(it, "00", 2); }
std::cout << "send message to all done, client count: " << kSessions.size()
<< ", elapsed: " << std::clock() - clock_start << std::endl;
}
{
// case 2: If send to 1 tcp connection, and send "00" 20,000 times, it will cost 0.015s.
// std::lock_guard<std::mutex> local_lock_guard{kSessionsMutex};
// for (auto &it : kSessions) {
// std::clock_t clock_start = std::clock();
// for (int i = 0; i < 20000; ++i) { bufferevent_write(it, "00", 2); }
// std::cout << "send message 20k times done, elapsed: " << std::clock() - clock_start
// << std::endl;
// }
}
std::this_thread::sleep_for(10s);
}
});
event_base_dispatch(kEventBase);
if (thread_send_message.joinable()) { thread_send_message.join(); }
{
std::lock_guard<std::mutex> local_lock_guard{kSessionsMutex};
for (auto &it : kSessions) { bufferevent_free(it); }
kSessions.clear();
}
if (kListener != nullptr) {
evconnlistener_free(kListener);
kListener = nullptr;
}
if (kEventBase != nullptr) {
event_base_free(kEventBase);
kEventBase = nullptr;
}
}
可重复性最小的示例
// case 1: 20,000 tcp connections, and send "00" for each every 10s, it will cost 0.15s.
std::clock_t clock_start = std::clock();
for (auto &it : kSessions) { bufferevent_write(it, "00", 2); }
std::cout << "send message to all done, client count: " << kSessions.size()
<< ", elapsed: " << std::clock() - clock_start << std::endl;
// case 2: only 1 tcp connection, and send "00" 20,000 times every 10s, it will cost 0.015s.
for (auto &it : kSessions) {
std::clock_t clock_start = std::clock();
for (int i = 0; i < 20000; ++i) { bufferevent_write(it, "00", 2); }
std::cout << "send message 20k times done, elapsed: " << std::clock() - clock_start
<< std::endl;
}
案例1:
% time seconds usecs/call calls errors syscall
------ ----------- ----------- --------- --------- ----------------
56.32 29.519892 9 3135415 408444 futex
20.53 10.762191 7 1490532 epoll_ctl
15.25 7.992391 11 715355 writev
3.98 2.086553 45360 46 nanosleep
1.86 0.973074 11 85273 1 epoll_wait
0.62 0.324022 8 39267 19266 accept4
0.58 0.305246 6 48721 read
0.55 0.286858 6 48762 write
0.30 0.154980 4 40004 setsockopt
0.01 0.006486 5 1216 mprotect
0.01 0.002952 21 143 madvise
0.00 0.001018 7 152 brk
0.00 0.000527 6 94 clock_gettime
0.00 0.000023 3 8 openat
0.00 0.000021 21 1 mremap
0.00 0.000010 0 22 mmap
0.00 0.000007 1 9 close
0.00 0.000000 0 8 fstat
0.00 0.000000 0 3 munmap
0.00 0.000000 0 4 rt_sigaction
0.00 0.000000 0 1 rt_sigprocmask
0.00 0.000000 0 1 ioctl
0.00 0.000000 0 1 readv
0.00 0.000000 0 8 8 access
0.00 0.000000 0 1 socket
0.00 0.000000 0 1 bind
0.00 0.000000 0 1 listen
0.00 0.000000 0 1 clone
0.00 0.000000 0 1 execve
0.00 0.000000 0 4 getuid
0.00 0.000000 0 4 getgid
0.00 0.000000 0 4 geteuid
0.00 0.000000 0 4 getegid
0.00 0.000000 0 1 arch_prctl
0.00 0.000000 0 1 set_tid_address
0.00 0.000000 0 2 set_robust_list
0.00 0.000000 0 1 eventfd2
0.00 0.000000 0 1 epoll_create1
0.00 0.000000 0 1 pipe2
0.00 0.000000 0 1 prlimit64
------ ----------- ----------- --------- --------- ----------------
100.00 52.416251 5605075 427719 total
案例2:
% time seconds usecs/call calls errors syscall
------ ----------- ----------- --------- --------- ----------------
normal listener stopped
66.66 0.151105 7 22506 3469 futex
9.74 0.022084 6 3709 1 epoll_wait
9.54 0.021624 4 5105 epoll_ctl
9.47 0.021466 8 2550 writev
2.47 0.005598 4 1263 write
1.70 0.003857 3 1246 read
0.18 0.000409 18 23 nanosleep
0.09 0.000197 4 46 clock_gettime
0.03 0.000068 4 16 mprotect
0.02 0.000035 2 21 mmap
0.01 0.000024 8 3 munmap
0.01 0.000019 10 2 1 accept4
0.01 0.000018 5 4 setsockopt
0.01 0.000015 8 2 set_robust_list
0.01 0.000014 4 4 rt_sigaction
0.01 0.000014 4 4 geteuid
0.01 0.000013 3 4 getgid
0.01 0.000012 3 4 getuid
0.01 0.000012 3 4 getegid
0.00 0.000011 1 8 fstat
0.00 0.000010 10 1 socket
0.00 0.000008 8 1 clone
0.00 0.000007 2 3 brk
0.00 0.000007 7 1 pipe2
0.00 0.000006 1 7 openat
0.00 0.000006 6 1 epoll_create1
0.00 0.000005 1 8 8 access
0.00 0.000005 5 1 bind
0.00 0.000005 5 1 eventfd2
0.00 0.000005 5 1 prlimit64
0.00 0.000004 1 7 close
0.00 0.000004 4 1 listen
0.00 0.000003 3 1 rt_sigprocmask
0.00 0.000003 3 1 arch_prctl
0.00 0.000003 3 1 set_tid_address
0.00 0.000000 0 1 execve
------ ----------- ----------- --------- --------- ----------------
100.00 0.226676 36561 3479 total
如何提高libevent bufferevent_write性能
阅读libevent的文档,研究其源代码,并考虑其他事件循环库,如libev、Qt、Wt、lib洋葱、POCO等…
请注意以下几点。我假设是一个现代的Linux/x86-64系统
-
您可以评测您的开源事件循环库(例如,通过使用最新的GCC从源代码编译它,并使用
-pg -O2
标志,然后使用strace(1(和/或gprof(1(、/或perf(1和/或time(1((以及top(1(,ps(1(;proc(5(;netstat(8(;ip(8(,ifconfig(8(、tcpdump(8(和xosview来观察整个Linux系统(。当然,读取时间(7(和epoll(7(以及轮询(2( -
TCP/IP引入了一些开销,IP路由增加了更多的开销,典型的以太网数据包至少有数百个字节(以及数十个字节的开销(。您当然希望同时发送(2(或recv(2(几百个字节发送短的
"00"
消息(大约四个字节的有用有效载荷(效率低下。确保您的应用程序一次发送数百字节的消息。您可能会考虑一些JSONRPC方法(当然,在更高级别上设计您的协议,使用更少但更大的消息触发每个更复杂的行为(或一些MPI方法。发送更少但更高级消息的一种方法是嵌入一些解释器,如Guile或Lua,并发送更高级的脚本块或请求(就像过去的NeWS,现在的PostgreSQL或exim( -
对于短距离和小型通信,更喜欢在同一台计算机上运行几个进程或线程,并使用mqoverview(7(、pipe(7(,fifo(7(和unix(7(来避免以太网。
-
大多数计算机都是2020年的多核计算机,请小心,您可以使用Pthreads或
std::thread
(每个核心上运行一个线程,因此笔记本电脑上至少有2到4个不同的线程,或者强大的Linux服务器上有100个线程(。您需要一些同步代码(例如std::mutex
与std::lock_guard
或Pẗ线程互斥….( -
意识到C10K问题,从现有的开源服务器程序或库中获得灵感,如lighttpd、Wt、FLTK、REDIS、Vmime、libcurl、libonion(并研究其源代码,并使用gdb(1(和/或strace(1(或ltrace(1(观察其运行时行为(
-
网络可能是瓶颈(然后您将无法改进代码以获得性能;您需要对软件体系结构进行一些更改(。阅读有关云计算、分布式计算、XDR、ASN.1、SOAP、REST、Web服务、libssh、π演算的更多信息
注意:
static void handler(int sig) {
std::cout << "get signal: " << sig << std::endl;
stop();
}
如果与signal(7(一起使用,则违反了信号安全规则(7(,因此您可能会使用管道(7(来自行处理Qt建议的技巧,或者考虑使用Linux特定的signalfd(2(系统调用。
另请阅读高级Linux编程,然后是syscalls(2(、socket(7(和tcp(7(。