I/O流真的线程安全吗?



我写了一个程序,在第一个线程中将随机数写入一个文件,另一个线程从那里读取这些随机数,并将这些素数写入另一个文件。需要第三个线程来停止/启动工作。我读到I/O线程是线程安全的。既然写入单个共享资源是线程安全的,那么问题会是什么呢?输出:numbers.log中总是有正确的记录,在numbers_prime.log中有时没有记录,当有素数时,有时都写。


#include <iostream>
#include <fstream>
#include <thread>
#include <mutex>
#include <vector>
#include <condition_variable>
#include <future>
#include <random>
#include <chrono>
#include <string>

using namespace std::chrono_literals;
std::atomic_int ITER_NUMBERS = 30;
std::atomic_bool _var = false;
bool ret() { return _var; }
std::atomic_bool _var_log = false;
bool ret_log() { return _var_log; }
std::condition_variable cv;
std::condition_variable cv_log;
std::mutex              mtx;
std::mutex mt;
std::atomic<int> count{0};
std::atomic<bool> _FL = 1;
int MIN = 100;
int MAX = 200;
bool is_empty(std::ifstream& pFile) // function that checks if the file is empty
{
return pFile.peek() == std::ifstream::traits_type::eof();
}

bool isPrime(int n) // function that checks if the number is prime
{
if (n <= 1)
return false;

for (int i = 2; i <= sqrt(n); i++)
if (n % i == 0)
return false;

return true;
}

void Log(int min, int max) { // function that generates random numbers and writes them to a file numbers.log
std::string str;
std::ofstream log;
std::random_device seed;
std::mt19937 gen{seed()};
std::uniform_int_distribution dist{min, max};
log.open("numbers.log", std::ios_base::trunc);
for (int i = 0; i < ITER_NUMBERS; ++i, ++count) {
std::unique_lock<std::mutex> ulm(mtx);
cv.wait(ulm,ret);
str = std::to_string(dist(gen)) + 'n';
log.write(str.c_str(), str.length());
log.flush();
_var_log = true;
cv_log.notify_one();
//_var_log = false;
//std::this_thread::sleep_for(std::chrono::microseconds(500000));

}
log.close();
_var_log = true;
cv_log.notify_one();
_FL = 0;
}


void printCheck() { // Checking function to start/stop printing

std::cout << "Log to file? [y/n]n";
while (_FL) {
char input;
std::cin >> input;
std::cin.clear();
if (input == 'y') {
_var = true;
cv.notify_one();
}
if (input == 'n') {
_var = false;
}
}
}
void primeLog() { // a function that reads files from numbers.log and writes prime numbers to numbers_prime.log
std::unique_lock ul(mt);
int number = 0;
std::ifstream in("numbers.log");
std::ofstream out("numbers_prime.log", std::ios_base::trunc);
if (is_empty(in)) {
cv_log.wait(ul, ret_log);
}
int oldCount{};
for (int i = 0; i < ITER_NUMBERS; ++i) {
if (oldCount == count && count != ITER_NUMBERS) { // check if primeLog is faster than Log. If it is faster, then we wait to continue
cv_log.wait(ul, ret_log);
_var_log = false;
}
if (!in.eof()) {
in >> number;
if (isPrime(number)) {
out << number;
out << "n";
}
oldCount = count;
}
}
}

int main() {
std::thread t1(printCheck);
std::thread t2(Log, MIN, MAX);
std::thread t3(primeLog);
t1.join();
t2.join();
t3.join();
return 0;
}

这与I/O流线程安全无关。所示代码的逻辑被破坏了。

显示的代码似乎遵循将单个逻辑算法分解为多个部分的设计模式,并将它们分散到很远的地方。这使得理解它在做什么变得更加困难。我们重写一下,让逻辑更清晰一些。在primeLog中,让我们这样做:

cv_log.wait(ul, []{ return _var_log; });
_var_log = false;

现在更清楚的是,它等待_var_log被设置,然后继续其愉快的方式。设置完成后立即重置。

后面的代码读正好是的一个数字文件,前循环回到这里。所以,primeLog的主循环将一直处理一个号码,在每个迭代循环。

现在的问题很容易看到,一旦我们走到另一边,做同样的澄清:

std::unique_lock<std::mutex> ulm(mtx);
cv.wait(ulm,[]){ return _var; });
// Code that generates one number and writes it to the file
_var_log = true;
cv_log.notify_one();

一旦_var设置为true,它将保持为true。这个循环开始全速运行,不断迭代。在循环的每次迭代中,它盲目地将_var_log设置为true,并向另一个线程的条件变量发出信号。

c++的执行线程是完全相互独立的,除非它们以某种方式显式同步。

在另一个执行线程醒来并决定从文件中读取第一个数字之前,没有什么可以阻止这个循环全速运行,通过它的整个数字范围。它会这样做,然后返回等待它的条件变量再次得到信号,等待下一个数字。它对第二个数字的希望和梦想将无法满足。

在生成线程循环的每次迭代中,另一个执行线程的条件变量都得到信号。

条件变量不是信号量。如果在条件变量发出信号时没有任何东西在等待,那就太糟糕了。当某个执行线程决定等待一个条件变量时,它可能会被唤醒,也可能不会被唤醒。

这两个执行线程中的一个依赖于它在每次循环迭代时接收条件变量通知。

另一个执行线程中的逻辑无法实现此保证。这可能不是唯一的缺陷,可能还有其他缺陷,经过进一步分析,这只是最明显的逻辑缺陷。

感谢那些写关于write -behind-write的文章的人,现在我知道了更多。但这不是问题所在。主要问题是,如果它是一个新文件,当在is_empty函数中调用pFile.peek()时,我们将文件标志永久地设置为eofbit。因此,直到程序结束in.rdstate() == std::ios_base::eofbit.

修复:重置标志位状态

if (is_empty(in)) {
cv_log.wait(ul, ret_log);
}
in.clear(); // reset state

还有一个问题是,从不同的线程读/写一个文件的特性,虽然这不是我的程序错误的原因,但它导致了另一个错误。

因为如果当我再次运行程序时,primeLog()打开std::ifstream in("numbers.log")的读取速度比log.open("numbers.log", std::ios_base::trunc)快,那么in将把旧数据保存到它的缓冲区中,而log.open将用std::ios_base::trunc标志擦除它们。因此,我们将读写旧数据到numbers_prime.log