我有一个字节数组(unsigned char *
),必须转换为整数。整数用三个字节表示。这就是我所做的
//bytes array is allocated and filled
//allocating space for intBuffer (uint32_t)
unsigned long i = 0;
uint32_t number;
for(; i<size_tot; i+=3){
uint32_t number = (bytes[i]<<16) | (bytes[i+1]<<8) | bytes[i+2];
intBuffer[number]++;
}
这段代码做得很好,但由于内存中有三次访问,速度非常慢(尤其是对于size_tot
的大值,按3000000
的顺序)。有没有一种方法可以更快地做到这一点并提高性能?
正确答案几乎总是:
编写正确的代码,启用优化,信任您的编译器
给定:
void count_values(std::array<uint32_t, 256^3>& results,
const unsigned char* from,
const unsigned char* to)
{
for(; from != to; from = std::next(from, 3)) {
++results[(*from << 16) | (*std::next(from, 1) << 8) | *(std::next(from,2))];
}
}
用-O3
编译
收益率(内联解释性注释):
__Z12count_valuesRNSt3__15arrayIjLm259EEEPKhS4_: ## @_Z12count_valuesRNSt3__15arrayIjLm259EEEPKhS4_
.cfi_startproc
## BB#0:
pushq %rbp
Ltmp0:
.cfi_def_cfa_offset 16
Ltmp1:
.cfi_offset %rbp, -16
movq %rsp, %rbp
Ltmp2:
.cfi_def_cfa_register %rbp
jmp LBB0_2
.align 4, 0x90
LBB0_1: ## %.lr.ph
## in Loop: Header=BB0_2 Depth=1
# dereference from and extend the 8-bit value to 32 bits
movzbl (%rsi), %eax
shlq $16, %rax # shift left 16
movzbl 1(%rsi), %ecx # dereference *(from+1) and extend to 32bits by padding with zeros
shlq $8, %rcx # shift left 8
orq %rax, %rcx # or into above result
movzbl 2(%rsi), %eax # dreference *(from+2) and extend to 32bits
orq %rcx, %rax # or into above result
incl (%rdi,%rax,4) # increment the correct counter
addq $3, %rsi # from += 3
LBB0_2: ## %.lr.ph
## =>This Inner Loop Header: Depth=1
cmpq %rdx, %rsi # while from != to
jne LBB0_1
## BB#3: ## %._crit_edge
popq %rbp
retq
.cfi_endproc
请注意,没有必要偏离标准构造或标准调用。编译器生成完美的代码。
为了进一步证明这一点,让我们疯狂地编写一个自定义迭代器,它允许我们将函数简化为:
void count_values(std::array<uint32_t, 256^3>& results,
byte_triple_iterator from,
byte_triple_iterator to)
{
assert(iterators_correct(from, to));
while(from != to) {
++results[*from++];
}
}
下面是这样一个迭代器的(基本)实现:
struct byte_triple_iterator
{
constexpr byte_triple_iterator(const std::uint8_t* p)
: _ptr(p)
{}
std::uint32_t operator*() const noexcept {
return (*_ptr << 16) | (*std::next(_ptr, 1) << 8) | *(std::next(_ptr,2));
}
byte_triple_iterator& operator++() noexcept {
_ptr = std::next(_ptr, 3);
return *this;
}
byte_triple_iterator operator++(int) noexcept {
auto copy = *this;
_ptr = std::next(_ptr, 3);
return copy;
}
constexpr const std::uint8_t* byte_ptr() const {
return _ptr;
}
private:
friend bool operator<(const byte_triple_iterator& from, const byte_triple_iterator& to)
{
return from._ptr < to._ptr;
}
friend bool operator==(const byte_triple_iterator& from, const byte_triple_iterator& to)
{
return from._ptr == to._ptr;
}
friend bool operator!=(const byte_triple_iterator& from, const byte_triple_iterator& to)
{
return not(from == to);
}
friend std::ptrdiff_t byte_difference(const byte_triple_iterator& from, const byte_triple_iterator& to)
{
return to._ptr - from._ptr;
}
const std::uint8_t* _ptr;
};
bool iterators_correct(const byte_triple_iterator& from,
const byte_triple_iterator& to)
{
if (not(from < to))
return false;
auto dist = to.byte_ptr() - from.byte_ptr();
return dist % 3 == 0;
}
现在我们有什么?
- 一个断言,用于检查我们的源确实是正确的长度(在调试构建中)
- 保证大小正确的输出结构
但是它对我们的目标代码做了什么?(用-O3 -DNDEBUG
编译)
.globl __Z12count_valuesRNSt3__15arrayIjLm259EEE20byte_triple_iteratorS3_
.align 4, 0x90
__Z12count_valuesRNSt3__15arrayIjLm259EEE20byte_triple_iteratorS3_: ## @_Z12count_valuesRNSt3__15arrayIjLm259EEE20byte_triple_iteratorS3_
.cfi_startproc
## BB#0:
pushq %rbp
Ltmp3:
.cfi_def_cfa_offset 16
Ltmp4:
.cfi_offset %rbp, -16
movq %rsp, %rbp
Ltmp5:
.cfi_def_cfa_register %rbp
jmp LBB1_2
.align 4, 0x90
LBB1_1: ## %.lr.ph
## in Loop: Header=BB1_2 Depth=1
movzbl (%rsi), %eax
shlq $16, %rax
movzbl 1(%rsi), %ecx
shlq $8, %rcx
orq %rax, %rcx
movzbl 2(%rsi), %eax
orq %rcx, %rax
incl (%rdi,%rax,4)
addq $3, %rsi
LBB1_2: ## %.lr.ph
## =>This Inner Loop Header: Depth=1
cmpq %rdx, %rsi
jne LBB1_1
## BB#3: ## %._crit_edge
popq %rbp
retq
.cfi_endproc
答:什么都没有-它同样高效。
教训?不真的!相信你的编译器!!!
假设你想对所有不同的值(你的代码:intBuffer[number]++;
)进行计数(intBuffer有2^24个项),你可以尝试进行一些循环展开:
代替:
for(; i<size_tot; i+=3){
uint32_t number = (bytes[i]<<16) | (bytes[i+1]<<8) | bytes[i+2];
intBuffer[number]++;
}
do:
for(; i<size_tot; i+=12){ // add extra ckeck here..
intBuffer[(bytes[i]<<16) | (bytes[i+1]<<8) | bytes[i+2]]++;
intBuffer[(bytes[i+3]<<16) | (bytes[i+4]<<8) | bytes[i+5]]++;
intBuffer[(bytes[i+6]<<16) | (bytes[i+7]<<8) | bytes[i+8]]++;
intBuffer[(bytes[i+9]<<16) | (bytes[i+10]<<8) | bytes[i+11]]++;
}
// Add a small loop for the remaining bytes (no multiple of 12)
这将允许cpu在一个时钟周期内执行多条指令(确保将编译器优化设置为最高级别)。
您还需要对bytes
的最后一部分进行额外检查。
查看指令流水线。
指令流水线是一种在单个处理器内实现并行形式的技术,称为指令级并行因此,它允许比在给定时钟速率下更快的CPU吞吐量(一个时间单位内可以执行的指令数量)。基本指令周期被分解为一个称为流水线的系列。与其按顺序处理每条指令(在开始下一条指令之前先完成一条指令),不如将每条指令拆分为一系列步骤,以便可以并行执行不同的步骤,并且可以同时处理指令(在结束前一条指令前开始一条指令。)。
更新:
但是非常慢
实际上,对于3MB来说,即使使用原始代码(考虑到数据已经缓存),这也应该是即时的。bytes
是如何定义的?可能是operator[]
在做一些额外的边界检查吗?
首先要确保编译器优化达到最高级别。
我想我会尝试一下:
unsigned char* pBytes = bytes;
uint32_t number;
for(unsigned long i = 0; i<size_tot; i+=3){
number = *pBytes << 16;
++pBytes;
number = number | (*pBytes << 8);
++pBytes;
number = number | *pBytes;
++pBytes;
++intBuffer[number];
}
编译后,我会检查生成的汇编程序代码的外观,看看更改后的代码是否真的有所不同。
尝试一次读取一个单词,然后提取所需的值。这应该比逐字节读取更有效率
以下是64位小端序系统的示例实现,它将一次读取3个64位值
void count(uint8_t* bytes, int* intBuffer, uint32_t size_tot)
{
assert(size_tot > 7);
uint64_t num1, num2, num3;
uint8_t *bp = bytes;
while ((uintptr_t)bp % 8) // make sure that the pointer is properly aligned
{
num1 = (bp[2] << 16) | (bp[1] << 8) | bp[0];
intBuffer[num1]++;
bp += 3;
}
uint64_t* ip = (uint64_t*)bp;
while ((uint8_t*)(ip + 2) < bytes + size_tot)
{
num1 = *ip++;
num2 = *ip++;
num3 = *ip++;
intBuffer[num1 & 0xFFFFFF]++;
intBuffer[(num1 >> 24) & 0xFFFFFF]++;
intBuffer[(num1 >> 48) | ((num2 & 0xFF) << 16)]++;
intBuffer[(num2 >> 8) & 0xFFFFFF]++;
intBuffer[(num2 >> 32) & 0xFFFFFF]++;
intBuffer[(num2 >> 56) | ((num3 & 0xFFFF) << 8)]++;
intBuffer[(num3 >> 16) & 0xFFFFFF]++;
intBuffer[num3 >> 40]++;
}
bp = (uint8_t*)ip;
while (bp < bytes + size_tot)
{
num1 = (bp[2] << 16) | (bp[1] << 8) | bp[0];
intBuffer[num1]++;
bp += 3;
}
}
您可以在编译器资源管理器上检查编译器输出。当然,聪明的编译器可能已经知道如何做到这一点,但大多数编译器并不知道。从Godbolt链接中可以看到,编译器将使用一堆movzx
来读取单独的字节,而不是读取整个寄存器。ICC将进行更多的循环展开,但Clang和GCC没有
类似地,对于32位体系结构,您还将在每次迭代中读取3个"单词"。此外,您可能需要进行一些手动循环展开,而不是依赖编译器来进行。下面是一个32位little-endian机器的例子。它可以很容易地适应像这样的大端
intBuffer[num1 >> 8]++;
intBuffer[((num1 & 0xFF) << 16) | (num2 >> 16)]++;
intBuffer[((num2 & 0xFFFF) << 8) | (num3 >> 24)]++;
intBuffer[num3 & 0xFFFFFF]++;
但是,为了获得更高的性能,您可能需要寻找类似SSE或AVX 的SIMD解决方案
以下是我在自己的项目中可以使用的内容(我使用了自定义错误代码而不是bool
):
template <std::endian endianness = std::endian::native, std::integral T>
constexpr auto parse_integer(std::span<std::uint8_t> buf, T& ret)
-> bool {
if (buf.size() < sizeof(T)) {
return false;
}
for (std::size_t i{0}; i < sizeof(T); ++i) {
if constexpr (endianness == std::endian::big) {
ret <<= 8U;
ret |= buf[i];
} else {
ret >>= 8U;
ret |= buf[i] << (8U * (sizeof(T) - 1));
}
}
return true;
}
请注意,该函数使用了一些C++20
特性,如std::endian
(适用于任何字节顺序!)和std::span
。C++20
概念的使用可以用简单的std::enable_if
s代替,跨度可以用模板参数代替,比如:
template <std::endian endianness = std::endian::native,
typename T,
typename Container,
typename = std::enable_if_t<std::is_integral_v<T>>>
inline constexpr auto parse_integer(const Container& buf, T& ret)
-> bool {
if (buf.size() < sizeof(T)) {
return false;
}
for (std::size_t i{0}; i < sizeof(T); ++i) {
if constexpr (endianness == std::endian::big) {
ret <<= 8U;
ret |= buf[i];
} else {
ret >>= 8U;
ret |= buf[i] << (8U * (sizeof(T) - 1));
}
}
return true;
}