具有并发性的漏桶算法



试图模拟一个场景,其中多个线程正在创建流量来填充bucket&以规定的速率使铲斗泄漏的螺纹。但是,代码正在进入死锁。你能复习一下这段代码吗?如果你看到任何错误,请告诉我;我应该补充的最好的修改。

代码

#include <iostream>
#include <mutex>
#include <condition_variable>
#include <thread>
#include <atomic>
#include <chrono>
using namespace std;
class LeakyBucket {
public:
LeakyBucket(int size, int rate) : maxCapacity(size), leakRate(rate), filled(0)  {}
void add(int newDataSize) {
unique_lock<mutex> lk(_mtx);
_cond.wait(lk, [this](){
return  filled<=maxCapacity;
});

filled = (filled+newDataSize) > maxCapacity ? maxCapacity:(filled+newDataSize);
cout<<"n Filled bucket with : "<<newDataSize;
cout<<"n Filled: "<<filled<<"n ----------";
_cond.notify_one();
}
void leak() {
while(1) {
{
unique_lock<mutex> lk(_mtx);
_cond.wait(lk, [this]() {
return filled > 0 || _done;
});
if(_done)
break;
filled = (filled-leakRate<0) ? 0 : (filled-leakRate);
cout << "n Leaked bucket with leakRate";
cout << "n BucketFilledRemain: " << filled << "n ----------";
_cond.notify_one();
}
_sleep:
this_thread::sleep_for(chrono::seconds(1));
}
}
bool _done = false;
private:
atomic<int> filled;
int maxCapacity;
int leakRate; // Per second
mutex _mtx;
condition_variable _cond;
};
void runLeakyBucketAlgorithm() {
LeakyBucket *lb = new LeakyBucket(30, 20);
thread t1(&LeakyBucket::leak, lb);
thread t2([&](){
for(int i=0; i<10; i++) {
cout<<"n launching thread: "<<i;
lb->add(rand()%40);
}
this_thread::sleep_for(chrono::seconds(5));
lb->_done = true;
});
if(t2.joinable()) {
t2.join();
}
t1.join();
}

O/p:

launching thread: 0
Filled bucket with : 7
Filled: 7
----------
launching thread: 1
Filled bucket with : 9
Filled: 16
----------
launching thread: 2
Leaked bucket with leakRate
BucketFilledRemain: 0
----------
Filled bucket with : 33
Filled: 30
----------
launching thread: 3
Filled bucket with : 18
Filled: 30
----------
launching thread: 4
Filled bucket with : 10
Filled: 30
----------
launching thread: 5
Filled bucket with : 32
Filled: 30
----------
launching thread: 6
Filled bucket with : 24
Filled: 30
----------
launching thread: 7
Filled bucket with : 38
Filled: 30
----------
launching thread: 8
Filled bucket with : 3
Filled: 30
----------
launching thread: 9
Filled bucket with : 29
Filled: 30
----------
Leaked bucket with leakRate
BucketFilledRemain: 10
----------
Leaked bucket with leakRate
BucketFilledRemain: 0

所示代码中存在多个基本错误。

thread t1(&LeakyBucket::leak, lb);

leak()将等待,直到铲斗的填充率至少为0,然后从中减去泄漏率。然后就完成了。就这样,不会再有了。泄漏的螺纹将不复存在。它将成为一个线程。它将永远怀念峡湾。一旦水桶泄漏一次,它的泄漏孔就会被堵住,它就变成了一个完全防漏的水桶。

new LeakyBucket(30, 20);

铲斗的容量为30,泄漏率为20。

lb->add(rand()%40);

这会被调用十次,添加0到39滴水。

比方说,我们第一次往桶里滴20滴水。泄漏的线程会醒来,带走那20滴水,并赢得它应得的退休生活。

但是等一下,我们还有九杯水要来了!

add()的第二次呼叫将滴下25滴水。第三次尝试添加30滴水。铲斗现在已超负荷。对add()的第四次调用现在将永远阻塞,因为正如我们刚刚看到的,bucket现在完全是防漏的。

这是第一个错误:bucket泄漏一次,然后就不再泄漏了。

_cond.wait(lk, [this]() {
return filled > 0;
});
filled -= leakRate;

水桶中的泄漏将等到水桶中至少有1滴水,然后泄漏20滴水。所以,如果桶里已经有五滴水了,那么在所有这些之后,桶里的水就会变成负十五滴水。这显然毫无意义,所以这将是第二个需要修复的错误,然后才能正常工作。

这里可能还有第三个bug。铲斗被定义为具有一定的容量。然而,在我上面的一个例子中,水桶的水滴最终超过了它所说的容量。这也算不上什么。

最新更新