为什么这个简单的生产者-消费者计划使用两个助推::纤维不能按预期工作



考虑以下带有ProducerConsumer的简化程序。每当int从前者可用时(在这种简单的情况下,每秒钟(,后者就会被通知。通知是通过将值推送到Consumer拥有的boost::fibers::unbuffered_channel中来完成的,一旦推送该值,预计CCD_5将从通道中弹出该值。推动和弹出是在两种不同的纤维中完成的。问题是,由于某些原因,Consumer会永远阻止pop((。你知道为什么吗?在不修改太多基于信号、通道和两条光纤的基本架构的情况下,可能的解决方案是什么?这里有一个完整源代码的链接,它打印了一些调试消息,以更好地理解程序流。如果你试图运行该程序,它会在一段时间不活动后自动停止运行。

#include <boost/fiber/all.hpp>
#include <boost/signals2/signal.hpp>
using channel_t = boost::fibers::unbuffered_channel<int>;
namespace fibers = boost::fibers;
struct Producer {
boost::signals2::signal<void(int)> sig;
void notify() {
std::cout << "enter Producer::notify()n";
for (int i = 0; i < 10; i++ ) {
sig(i);
std::this_thread::sleep_for(std::chrono::seconds{1});
}
std::cout << "complete Producer::notify()n";
}
void start() {
std::cout << "enter Producer::start()n";
fibers::fiber(fibers::launch::dispatch, &Producer::notify, this).detach();
std::cout << "complete Producer::start()n";
}        
};
struct Consumer {
channel_t chan;
Consumer() {
std::cout << "enter Consumer()n";
fibers::fiber(fibers::launch::dispatch, &Consumer::start, this).detach();
std::cout << "complete Consumer()n";
}
void start() {
std::cout << "enter Consumer::start()n";
int i;
chan.pop(i);
std::cout << "complete Consumer::start()n";
}
void operator()(int i) {
std::cout << "enter Consumer::operator()n";
chan.push(i); 
std::cout << "complete Consumer::operator()n";
}    
};
int main() {
Consumer c;
Producer p;
p.sig.connect(std::ref(c));
p.start();
std::cout << "done." << std::endl;
return EXIT_SUCCESS;
}

首先,在"绿色丝线":使CCD_ 8。

接下来,选择dispatch意味着当前执行的逻辑线程将立即挂起,以支持启动的光纤。这使得您可能不会同时启动通道的两端。

现在,因为所有光纤都已分离,所以在光纤完成之前,您正在破坏消费者/生产者实例。

演示:

  • 以上更改
  • 更一致的跟踪
  • 使用冲洗std::endl;实现输出的即时性
  • 不从构造函数中启动光纤
  • 一致命名start(启动功能(与run_fiber
  • 使生产者/消费者的寿命可见
  • 检查并报告通道操作的结果

在Wandbox上直播

#include <boost/fiber/all.hpp>
#include <boost/signals2/signal.hpp>
using namespace std::chrono_literals;
using boost::this_fiber::sleep_for;
using channel_t = boost::fibers::unbuffered_channel<int>;
using boost::fibers::fiber;
static constexpr auto how = boost::fibers::launch::dispatch;
struct tracer_t {
using S = char const*;
S n, e, l;
tracer_t(S n, S e, S l) : n(n), e(e), l(l) {
std::cout << e << " " << n << std::endl;
}
~tracer_t() { std::cout << l << " " << n << std::endl; }
};
struct fun_tracer_t : tracer_t { fun_tracer_t(S n) : tracer_t(n, "Enter", "Leave") {} };
struct obj_tracer_t : tracer_t { obj_tracer_t(S n) : tracer_t(n, "Construct", "Destruct") {} };
#define FUNTRACE fun_tracer_t _local(__PRETTY_FUNCTION__)
struct Producer {
obj_tracer_t _lifetime{"Producer"};
boost::signals2::signal<void(int)> sig;
void start() {
FUNTRACE;
fiber(how, &Producer::fiber_run, this).detach();
}
void fiber_run() {
FUNTRACE;
for (int i = 0; i < 10; i++) {
sig(i);
sleep_for(1s);
}
}
};
bool checked(boost::fibers::channel_op_status s) {
using S = boost::fibers::channel_op_status;
std::cout << " -> channel_op_status: " << [s] {
switch (s) {
case S::closed: return "Closed";
case S::empty: return "Empty"; break;
case S::timeout: return "Timeout"; break;
case S::full: return "Full"; break;
case S::success: return "Success";
default: return "?";
};
}() << std::endl;
return s == S::success;
}
#define CHECKED(op)                                                            
[&] {                                                                      
std::cout << "Channel operation: " << #op << std::endl;                
auto tmp = (op);                                                       
return checked(tmp);                                                   
}()
struct Consumer {
obj_tracer_t _lifetime{"Consumer"};
channel_t chan;
void start() {
FUNTRACE;
fiber(how, &Consumer::fiber_run, this).detach();
}
void fiber_run() {
FUNTRACE;
int i = 0;
CHECKED(chan.pop(i));
}
void operator()(int i) {
FUNTRACE;
CHECKED(chan.push(i));
}    
};
int main() {
try {
FUNTRACE;
Consumer c;
Producer p;
p.sig.connect(std::ref(c));
p.start();
c.start();
std::cout << "done." << std::endl;
return 0;
} catch (std::exception const& e) {
std::cerr << "exception: " << e.what() << std::endl;
} catch (...) {
std::cerr << "unhandled exception" << std::endl;
}
std::cout << "exit." << std::endl;
return 1;
}

在我的机器上打印:

Enter int main()
Construct Consumer
Construct Producer
Enter void Producer::start()
Enter void Producer::fiber_run()
Enter void Consumer::operator()(int)
Channel operation: chan.push(i)
Leave void Producer::start()
Enter void Consumer::start()
Enter void Consumer::fiber_run()
Channel operation: chan.pop(i)
-> channel_op_status: Success
Leave void Consumer::fiber_run()
Leave void Consumer::start()
done.
Destruct Producer
Destruct Consumer
Leave int main()
-> channel_op_status: Success
Leave void Consumer::operator()(int)
Segmentation fault (core dumped)
real    0m1,112s
user    0m0,999s
sys     0m0,003s

正如您所看到的,明显的问题是消费者/生产者对象的生存期。Wandbox上的输出略有不同(可能是由于boost的调试构建?(

... [skipped]
Destruct Producer
Destruct Consumer
Leave int main()
-> channel_op_status: Closed
Leave void Consumer::operator()(int)
prog.exe: /opt/wandbox/boost-1.79.0-gcc-12.1.0/include/boost/signals2/detail/lwm_pthreads.hpp:60: void boost::signals2::mutex::lock(): Assertion `pthread_mutex_lock(&m_) == 0' failed.

经过的时间表明,一旦sleep_for(1s)完成——在销毁之后——事情就会出错。我们可以通过在破坏对象之前插入this_fiber::sleep_for(11s)来强制使用寿命。

还添加了:

  • 日志行的时间戳
  • 使消费类光纤成环,使其不会阻塞生产者

在Wandbox上直播

#include <boost/fiber/all.hpp>
#include <boost/signals2/signal.hpp>
#include <iomanip>
#include <iostream>
using namespace std::chrono_literals;
using boost::this_fiber::sleep_for;
using channel_t = boost::fibers::unbuffered_channel<int>;
using boost::fibers::fiber;
static constexpr auto how = boost::fibers::launch::dispatch;
static auto stamp() {
static const auto s = std::chrono::steady_clock::now();
return std::to_string((std::chrono::steady_clock::now() - s) / 1ms) + "ms";
}
static auto& log() { return std::cout << std::setw(7) << stamp() << " "; }
struct tracer_t {
using S = char const*;
S n, e, l;
tracer_t(S n, S e, S l) : n(n), e(e), l(l) {
log() << e << " " << n << std::endl;
}
~tracer_t() { log() << l << " " << n << std::endl; }
};
struct fun_tracer_t : tracer_t { fun_tracer_t(S n) : tracer_t(n, "Enter", "Leave") {} };
struct obj_tracer_t : tracer_t { obj_tracer_t(S n) : tracer_t(n, "Construct", "Destruct") {} };
#define FUNTRACE fun_tracer_t _local(__PRETTY_FUNCTION__)
struct Producer {
obj_tracer_t _lifetime{"Producer"};
boost::signals2::signal<void(int)> sig;
void start() {
FUNTRACE;
fiber(how, &Producer::fiber_run, this).detach();
}
void fiber_run() {
FUNTRACE;
for (int i = 0; i < 10; i++) {
sig(i);
sleep_for(1s);
}
}
};
bool checked(boost::fibers::channel_op_status s) {
using S = boost::fibers::channel_op_status;
log() << " -> channel_op_status: " << [s] {
switch (s) {
case S::closed: return "Closed";
case S::empty: return "Empty"; break;
case S::timeout: return "Timeout"; break;
case S::full: return "Full"; break;
case S::success: return "Success";
default: return "?";
};
}() << std::endl;
return s == S::success;
}
#define CHECKED(op)                                                            
[&] {                                                                      
log() << "Channel operation: " << #op << std::endl;                
auto tmp = (op);                                                       
return checked(tmp);                                                   
}()
struct Consumer {
obj_tracer_t _lifetime{"Consumer"};
channel_t chan;
void start() {
FUNTRACE;
fiber(how, &Consumer::fiber_run, this).detach();
}
void fiber_run() {
FUNTRACE;
for (int i = 0; CHECKED(chan.pop(i));) {
log() << " -> popped: " << i << std::endl;
if (i == 9)
break;
}
}
void operator()(int i) {
FUNTRACE;
CHECKED(chan.push(i));
}    
};
int main() {
try {
FUNTRACE;
Consumer c;
Producer p;
p.sig.connect(std::ref(c));
p.start();
c.start();
sleep_for(11s);
log() << "done." << std::endl;
return 0;
} catch (std::exception const& e) {
std::cerr << "exception: " << e.what() << std::endl;
} catch (...) {
std::cerr << "unhandled exception" << std::endl;
}
log() << "exit." << std::endl;
return 1;
}

打印

0ms Enter int main()
0ms Construct Consumer
0ms Construct Producer
0ms Enter void Producer::start()
0ms Enter void Producer::fiber_run()
0ms Enter void Consumer::operator()(int)
0ms Channel operation: chan.push(i)
0ms Leave void Producer::start()
0ms Enter void Consumer::start()
0ms Enter void Consumer::fiber_run()
0ms Channel operation: chan.pop(i)
0ms  -> channel_op_status: Success
0ms  -> popped: 0
0ms Channel operation: chan.pop(i)
0ms Leave void Consumer::start()
0ms  -> channel_op_status: Success
0ms Leave void Consumer::operator()(int)
1000ms Enter void Consumer::operator()(int)
1000ms Channel operation: chan.push(i)
1000ms  -> channel_op_status: Success
1000ms  -> popped: 1
1000ms Channel operation: chan.pop(i)
1000ms  -> channel_op_status: Success
1000ms Leave void Consumer::operator()(int)
2000ms Enter void Consumer::operator()(int)
2000ms Channel operation: chan.push(i)
2000ms  -> channel_op_status: Success
2000ms  -> popped: 2
2000ms Channel operation: chan.pop(i)
2000ms  -> channel_op_status: Success
2000ms Leave void Consumer::operator()(int)
3000ms Enter void Consumer::operator()(int)
3000ms Channel operation: chan.push(i)
3000ms  -> channel_op_status: Success
3000ms  -> popped: 3
3000ms Channel operation: chan.pop(i)
3000ms  -> channel_op_status: Success
3000ms Leave void Consumer::operator()(int)
4000ms Enter void Consumer::operator()(int)
4000ms Channel operation: chan.push(i)
4000ms  -> channel_op_status: Success
4000ms  -> popped: 4
4000ms Channel operation: chan.pop(i)
4000ms  -> channel_op_status: Success
4000ms Leave void Consumer::operator()(int)
5001ms Enter void Consumer::operator()(int)
5001ms Channel operation: chan.push(i)
5001ms  -> channel_op_status: Success
5001ms  -> popped: 5
5001ms Channel operation: chan.pop(i)
5001ms  -> channel_op_status: Success
5001ms Leave void Consumer::operator()(int)
6001ms Enter void Consumer::operator()(int)
6001ms Channel operation: chan.push(i)
6001ms  -> channel_op_status: Success
6001ms  -> popped: 6
6001ms Channel operation: chan.pop(i)
6001ms  -> channel_op_status: Success
6001ms Leave void Consumer::operator()(int)
7001ms Enter void Consumer::operator()(int)
7001ms Channel operation: chan.push(i)
7001ms  -> channel_op_status: Success
7001ms  -> popped: 7
7001ms Channel operation: chan.pop(i)
7001ms  -> channel_op_status: Success
7001ms Leave void Consumer::operator()(int)
8001ms Enter void Consumer::operator()(int)
8001ms Channel operation: chan.push(i)
8001ms  -> channel_op_status: Success
8001ms  -> popped: 8
8001ms Channel operation: chan.pop(i)
8001ms  -> channel_op_status: Success
8001ms Leave void Consumer::operator()(int)
9001ms Enter void Consumer::operator()(int)
9001ms Channel operation: chan.push(i)
9001ms  -> channel_op_status: Success
9001ms  -> popped: 9
9001ms Leave void Consumer::fiber_run()
9002ms  -> channel_op_status: Success
9002ms Leave void Consumer::operator()(int)
10002ms Leave void Producer::fiber_run()
11000ms done.
11000ms Destruct Producer
11000ms Destruct Consumer
11000ms Leave int main()

但是

这似乎没有规模。正确的你可能想要

  1. std::enable_shared_from_this派生Consumer/Productor,即使分离光纤,也能自动控制寿命
  2. 不要分离纤维,这样你就可以在销毁相关物体之前对其进行join
  3. 合作以实现优雅的关闭。例如,您可以检测到一个闭合通道作为停止光纤的标志,或者您可以推送一个特殊的哨值(例如,本例中的9(以发出结束信号

组合2。和3。在这里还请注意测试强制提前关闭的注释位:

//// E.g. for forced shutdown:
// { sleep_for(4s); c.close(); }

Wandbox直播

#include <boost/fiber/all.hpp>
#include <boost/signals2/signal.hpp>
#include <iomanip>
#include <iostream>
using namespace std::chrono_literals;
using boost::this_fiber::sleep_for;
using channel_t = boost::fibers::unbuffered_channel<int>;
using boost::fibers::fiber;
static constexpr auto how = boost::fibers::launch::dispatch;
static auto stamp() {
static const auto s = std::chrono::steady_clock::now();
return std::to_string((std::chrono::steady_clock::now() - s) / 1ms) + "ms";
}
static auto& log(auto const&... args) {
return ((std::cout << std::setw(7) << stamp() << " ") << ... << args)
<< std::endl;
}
struct tracer_t {
using S = char const*;
S n, e, l;
tracer_t(S n, S e, S l) : n(n), e(e), l(l) { log(e, " ", n); }
~tracer_t() { log(l, " ", n); }
};
struct fun_tracer_t : tracer_t { fun_tracer_t(S n) : tracer_t(n, "Enter", "Leave") {} };
struct obj_tracer_t : tracer_t { obj_tracer_t(S n) : tracer_t(n, "Construct", "Destruct") {} };
#define FUNTRACE fun_tracer_t _local(__PRETTY_FUNCTION__)
static constexpr int SENTINEL = -99;
struct Producer {
obj_tracer_t _lifetime{"Producer"};
boost::signals2::signal<bool(int)> _sig;
fiber _fib;
~Producer() {
FUNTRACE;
if (_fib.joinable())
_fib.join();
}
void start() {
FUNTRACE;
_fib = {how, &Producer::fiber_run, this};
}
void fiber_run() {
FUNTRACE;
for (int i = 0; i < 10; i++) {
if (!_sig(i).value_or(false)) {
log("Aborting producer");
return;
}
sleep_for(1s);
}
log("Closing producer");
_sig(SENTINEL);
}
};
bool checked(boost::fibers::channel_op_status s) {
using S = boost::fibers::channel_op_status;
log(" -> channel_op_status: ", [s] {
switch (s) {
case S::closed: return "Closed";
case S::empty: return "Empty"; break;
case S::timeout: return "Timeout"; break;
case S::full: return "Full"; break;
case S::success: return "Success";
default: return "?";
};
}());
return s == S::success;
}
#define CHECKED(op)                                                            
[&] {                                                                      
log("Channel operation: ", #op);                                       
auto tmp = (op);                                                       
return checked(tmp);                                                   
}()
struct Consumer {
obj_tracer_t _lifetime{"Consumer"};
channel_t _chan;
fiber     _fib;
~Consumer() {
FUNTRACE;
close();
if (_fib.joinable())
_fib.join();
}
void start() { FUNTRACE;
_fib = {how, &Consumer::fiber_run, this};
}
void close() {
if (!_chan.is_closed())
_chan.close();
}
void fiber_run() {
FUNTRACE;
for (int i = 0; CHECKED(_chan.pop(i));) {
log(" -> popped: ", i);
if (i == SENTINEL)
break;
}
close();
}
bool operator()(int i) {
FUNTRACE;
return CHECKED(_chan.push(i));
}    
};
int main() {
try {
FUNTRACE;
Consumer c;
Producer p;
p._sig.connect(std::ref(c));
p.start();
c.start();
log("done.");
//// E.g. for forced shutdown:
// { sleep_for(4s); c.close(); }
return 0;
} catch (std::exception const& e) {
log("exception: ", e.what());
} catch (...) {
log("unhandled exception");
}
log("exit.");
return 1;
}

打印预期

0ms Enter int main()
0ms Construct Consumer
0ms Construct Producer
0ms Enter void Producer::start()
0ms Enter void Producer::fiber_run()
0ms Enter bool Consumer::operator()(int)
0ms Channel operation: _chan.push(i)
0ms Leave void Producer::start()
0ms Enter void Consumer::start()
0ms Enter void Consumer::fiber_run()
0ms Channel operation: _chan.pop(i)
0ms  -> channel_op_status: Success
0ms  -> popped: 0
// ...
10002ms Closing producer
10002ms Enter bool Consumer::operator()(int)
10002ms Channel operation: _chan.push(i)
10002ms  -> channel_op_status: Success
10002ms  -> popped: -99
10002ms Leave void Consumer::fiber_run()
10002ms  -> channel_op_status: Closed
10002ms Leave bool Consumer::operator()(int)
10002ms Leave void Producer::fiber_run()
10002ms Leave Producer::~Producer()
10002ms Destruct Producer
10002ms Enter Consumer::~Consumer()
10002ms Leave Consumer::~Consumer()
10002ms Destruct Consumer
10002ms Leave int main()

最新更新