提升的线程安全性::unordered_map<int、结构>和shared_mutex



我正试图解析来自4个线程的套接字的ts流数据。我决定使用boost共享互斥来管理连接和数据接收。但我完全是c++的新手,我不确定我是否能正确地使用踏面安全。我正在使用boost unordered_map<int,>,当一个新用户正在连接时,我用唯一的锁锁定互斥锁并将用户添加到映射中,当该用户断开连接时,我用唯一的锁锁定互斥锁并将其从映射中删除。TsStreams结构包含向量和一些额外的变量,而用户发送数据,我使用共享锁从映射中获取用户的TsStreams引用,向向量添加新数据并修改其他变量。以这种方式修改TsStreams是线程安全的吗?

class Demuxer {
public:
Demuxer();
typedef signal<void (int, TsStream)> PacketSignal;
void onUserConnected(User);
void onUserDisconnected(int);
void onUserData(Data);
void addPacketSignal(const PacketSignal::slot_type& slot);
private:
mutable PacketSignal packetSignal;
void onPacketReady(int, TsStream);
TsDemuxer tsDemuxer;
boost::unordered_map<int, TsStreams> usersData;
boost::shared_mutex mtx_;
};
#include "Demuxer.h"
Demuxer::Demuxer() {
tsDemuxer.addPacketSignal(boost::bind(&Demuxer::onPacketReady, this, _1, _2));
}
void Demuxer::onUserConnected(User user){
boost::unique_lock<boost::shared_mutex> lock(mtx_);
if(usersData.count(user.socket)){
usersData.erase(user.socket);
}
TsStreams streams;
streams.video.isVideo = true;
usersData.insert(std::make_pair(user.socket, streams));
}
void Demuxer::onUserDisconnected(int socket){
boost::unique_lock<boost::shared_mutex> lock(mtx_);
if(usersData.count(socket)){
usersData.erase(socket);
}
}
void Demuxer::onUserData(Data data) {
boost::shared_lock<boost::shared_mutex> lock(mtx_);
if(!usersData.count(data.socket)){
return;
}
tsDemuxer.parsePacket(data.socket, std::ref(usersData.at(data.socket)), (uint8_t *) data.buffer, data.length);
}
void Demuxer::onPacketReady(int socket, TsStream data) {
packetSignal(socket, data);
}
void Demuxer::addPacketSignal(const PacketSignal::slot_type& slot){
packetSignal.connect(slot);
}
struct TsStreams{
TsStreams() = default;
TsStreams(const TsStreams &p1) {}
TsStream video;
TsStream audio;
};
struct TsStream
{
TsStream() = default;
TsStream(const TsStream &p1) {}
boost::recursive_mutex mtx_; // to make sure to have the queue, it may not be necessary
uint64_t PTS = 0;
uint64_t DTS = 0;
std::vector<char> buffer;
uint32_t bytesDataLength = 0;
bool isVideo = false;
};
class TsDemuxer {
public:
typedef signal<void (int, TsStream)> PacketSignal;
void parsePacket(int socket, TsStreams &streams, uint8_t *data, int size);
connection addPacketSignal(const PacketSignal::slot_type& slot);
private:
PacketSignal packetSignal;
void parseTSPacket(int socket, TsStream &stream, uint8_t *data, int size);
void parseAdaptationField(BitReader &bitReader);
void parseStream(int socket, TsStream &stream, BitReader &bitReader, uint32_t payload_unit_start_indicator);
void parsePES(TsStream &stream, BitReader &bitReader);
int64_t parseTSTimestamp(BitReader &bitReader);
};
void TsDemuxer::parsePacket(int socket, TsStreams &streams, uint8_t *data, int size) {
//some parsing
if(video){
streams.video.mtx_.lock();
parseTSPacket(socket, streams.video, (uint8_t *)buf, 188);
}else{
streams.audio.mtx_.lock();
parseTSPacket(socket, streams.audio, (uint8_t *)buf, 188);
}
}
void TsDemuxer::parseTSPacket(int socket, TsStream &stream, uint8_t *data, int size)
{
//some more parsing
parseStream(socket, stream, bitReader, payload_unit_start_indicator);
}
void TsDemuxer::parseStream(int socket, TsStream &stream, BitReader &bitReader, uint32_t payload_unit_start_indicator) {
if(payload_unit_start_indicator)
{
if(!stream.buffer.empty()){
packetSignal(socket, stream);
stream.buffer = vector<char>();
stream.bytesDataLength = 0;
}
parsePES(stream, bitReader);
}
size_t payloadSizeBytes = bitReader.numBitsLeft() / 8;
copy(bitReader.getBitReaderData(), bitReader.getBitReaderData()+payloadSizeBytes,back_inserter(stream.buffer));
stream.mtx_.unlock();
}

对我来说这个demuxer看起来是正确的。但也有一些效率低下的地方:

  1. 您不需要在erase之前进行count。只是抹去。如果一个元素不存在,这将不执行任何操作。这样可以节省一次查找。同样,不要在count之后再使用at。使用find(参见下面的用法)。

  2. 你可能想把尽可能多的工作移出临界区。例如,在onUserConnected中,您可以在获取锁之前创建TsStreams对象。

  3. 注意,更改无序映射永远不会使指向映射中元素的指针或引用失效,除非它们被擦除。这意味着在onUserData中,您不必在解析数据包时持有映射上的锁。

也就是说,假设您没有从两个不同的线程中为同一个用户调用onUserData。您可以通过在TsStream对象中引入第二个锁来防止这种情况。同样,您应该防止在另一个线程仍可能解析最后一个数据包时擦除元素。我会使用shared_ptr。像这样:

class Demuxer {
...
boost::unordered_map<int, boost::shared_ptr<TsStreams> > usersData;
boost::shared_mutex mtx_;
};
void Demuxer::onUserData(Data data) {
boost::shared_lock<boost::shared_mutex> maplock(mtx_);
auto found = usersData.find(data.socket);
if(found == usersData.end())
return;
boost::shared_ptr<TsStreams> stream = found->second;
boost::unique_lock<boost::recursive_mutex> datalock(stream->mtx_);
maplock.unlock();
tsDemuxer.parsePacket(data.socket, *stream, (uint8_t *) data.buffer, data.length);
}

如果您使用这种方法减少了使用Demuxer锁的时间,那么您可能应该将共享互斥锁替换为普通互斥锁。共享互斥锁的开销要高得多,对于这么短的临界区来说不值得使用。

TsDemuxer看起来有点不稳定:

TsDemuxer::parsePacket中,你永远不会解锁互斥锁。那不应该是unique_lock吗?同样地,在parseStream中,解锁似乎是不配对的。一般来说,与手动锁定和解锁相比,使用unique_lock对象总是一种可行的方法。如果有的话,锁定和解锁unique_lock,而不是互斥锁。

与多线程无关的备注

  1. stream.buffer.clear()stream.buffer = vector<char>()更有效,因为这将重用缓冲区内存,而不是完全释放它。

  2. 正如其他人所注意到的,boost的这些部分现在是标准库的一部分。将boost::替换为std::,并启用最新的c++标准,如c++ 14或c++ 17,就可以了。在最坏的情况下,你不得不将shared_mutex替换为shared_timed_mutex

  3. 在Demuxer中,您按值传递User和Data对象。你确定这些不应该是const引用吗?