我正在尝试编写一个缓冲区,该缓冲区可以将数据推到缓冲区,检查是否已满,如有必要,请交换缓冲区。另一个线程可以获取文件输出的缓冲区。
我已经成功实现了缓冲区,但是我想添加一种迫使wapbuffer方法,该方法将迫使不完整的缓冲区交换并从不完整的缓冲区返回数据。为了做到这一点,我检查读取和写入缓冲区是否相同(试图强制施加缓冲区以写入文件,而还有其他可以写入的完整缓冲区)。我希望这种方法能够与GetBuffer方法并肩运行(不是真正的必要方法,但我想尝试并偶然发现了这个问题)。
getBuffer会阻塞,当porcestwapbuffer完成后,它仍将阻止,直到新的缓冲区完全填满,因为在porcestwapbuffer中,我更改了原子_Read_buffer_index。我想知道这是否总是有效吗?GetBuffer的阻塞锁是否会检测到Atomic Read_buffer_index的更改并更改静音的误差,或者在锁定开始时检查一下它必须锁定的mutex并继续尝试锁定相同的utex即使索引,即使是索引更改?
/* selection of member data */
unsigned int _size, _count;
std::atomic<unsigned int> _write_buffer_index, _read_buffer_index;
unsigned int _index;
std::unique_ptr< std::unique_ptr<T[]>[] > _buffers;
std::unique_ptr< std::mutex[] > _mutexes;
std::recursive_mutex _force_swap_buffer;
/* selection of implementation of member functions */
template<typename T> // included to show the use of the recursive_mutex
void Buffer<T>::Push(T *data, unsigned int length) {
std::lock_guard<std::recursive_mutex> lock(_force_swap_buffer);
if (_index + length <= _size) {
memcpy(&_buffers[_write_buffer_index][_index], data, length*sizeof(T));
_index += length;
} else {
memcpy(&_buffers[_write_buffer_index][_index], data, (_size - _index)*sizeof(T));
unsigned int t_index = _index;
SwapBuffer();
Push(&data[_size - t_index], length - (_size - t_index));
}
}
template<typename T>
std::unique_ptr<T[]> Buffer<T>::GetBuffer() {
std::lock_guard<std::mutex> lock(_mutexes[_read_buffer_index]); // where the magic should happen
std::unique_ptr<T[]> result(new T[_size]);
memcpy(result.get(), _buffers[_read_buffer_index].get(), _size*sizeof(T));
_read_buffer_index = (_read_buffer_index + 1) % _count;
return std::move(result);
}
template<typename T>
std::unique_ptr<T[]> Buffer<T>::ForceSwapBuffer() {
std::lock_guard<std::recursive_mutex> lock(_force_swap_buffer); // lock that forbids pushing and force swapping at the same time
if (_write_buffer_index != _read_buffer_index)
return nullptr;
std::unique_ptr<T[]> result(new T[_index]);
memcpy(result.get(), _buffers[_read_buffer_index].get(), _index*sizeof(T));
unsigned int next = (_write_buffer_index + 1) % _count;
_mutexes[next].lock();
_read_buffer_index = next; // changing the read_index while the other thread it blocked, the new mutex is already locked so the other thread should remain locked
_mutexes[_write_buffer_index].unlock();
_write_buffer_index = next;
_index = 0;
return result;
}
您的代码存在一些问题。首先,在修改原子变量时要小心。只有一小部分操作是原子能的(请参阅http://en.cppreference.com/w/cpp/atomic/atomic),原子操作的组合不是原子。考虑:
_read_buffer_index = (_read_buffer_index + 1) % _count;
这里发生的事情是您对变量,增量,Modulo操作和原子商店的原子读取。但是,整个语句本身不是不是原子!如果_count
是2的功率,则只需使用++
操纵器即可。如果不是这样,则必须将_read_buffer_index
读取为临时变量,执行上述计算,然后使用compare_exchange
函数存储新值如果变量在平均时间中没有更改。显然,后者必须在循环中完成,直到成功为止。您还必须担心一个线程会在第二个线程的读取和compare_exchange之间递增变量_count
倍,在这种情况下,第二个线程错误地认为该变量不会更改。
第二个问题是缓存线弹跳。如果您在同一缓存线上有多个静音线,那么如果两个或多个线程尝试同时访问它们,则性能将非常糟糕。缓存线的大小取决于您的平台。
主要问题是,虽然ForceSwapBuffer()
和Push()
都锁定_force_swap_buffer
MUTEX,但GetBuffer()
却没有。GetBuffer()
确实更改_read_buffer_index
。因此,在ForceSwapBuffer()
中:
std::lock_guard<std::recursive_mutex> lock(_force_swap_buffer);
if (_write_buffer_index != _read_buffer_index)
return nullptr;
// another thread can call GetBuffer() here and change _read_buffer_index
// rest of the code here
if
统计后_write_buffer_index == _read_buffer_index
的假设实际上是无效的。