使用临时文件处理大量数据



我正在尝试使用临时序列化文件(指针表包含[no, offset, length]在数据文件和数据文件中包含数据)实现线程间通信。一个线程应该接收数据(处理数据)并将其保存到内存中。第二个线程应该从内存中读取数据并显示结果。(输入线程只添加数据,输出线程只读取数据)

我必须将其编译为32位,所以我试图通过读写临时文件来解决2 GB的限制。

我实现了一个简单的例子。但问题是,如果I/O线程同时工作,则输出线程不能正确读取。如果输入线程写入并关闭文件,那么输出线程读取并关闭它就可以正常工作。我已经用shared_mutex和mutex做了同步,结果同样不好。

提前感谢您的回复。

更新:在使用ofstream将数据写入文件后,根据Update ifstream对象重置标志(stream.clear()),行为会变得更好,但有时仍然会失败,有时会通过。

主:

int main() {        
//Start input and output job
std::thread input = std::thread(inputJob);
std::thread output = std::thread(outputJob);

//Wait here for end
input.join();
output.join();

//HERE checking results

return 0;
}

输入工作:

void inputJob() {
is_on = true;
//Loading input data
for (int i = 1; i < 10; i++) {
student s("George", "Patton", 100000 + i, (i % 2) > 0, "0A");
for (int j = 1; j < 4; j++) s.subjects.push_back(subject(rand(), "MA", (j % 5)));
s1.push_back(s);
}
//Save to binary file
std::fstream data_stream, table_stream;
table_stream.open("./table.data", std::fstream::out | std::fstream::trunc | std::fstream::binary);
data_stream.open("./data.data", std::fstream::out | std::fstream::trunc | std::fstream::binary);
size_t off = 0;
if (table_stream.is_open() && data_stream.is_open()) for (size_t i = 0; i < s1.size(); i++) {
std::string tmp = s1[i].toBinaryString();
size_t sz = tmp.size();
table_row t(i, off, sz);
off += sz;
{
std::lock_guard lock(m);
table_stream << t.toBinaryString();
data_stream << tmp;
cout << "Written" << endl;
}       
}
table_stream.close();
data_stream.close();
is_on = false;
}

输出工作:

void outputJob() {
//Load from binary file
std::fstream data_stream, table_stream;
table_stream.open("./table.data", std::fstream::in | std::fstream::binary);
data_stream.open("./data.data", std::fstream::in | std::fstream::binary);
if (table_stream.is_open() && data_stream.is_open()) {
size_t row_sz = sizeof(table_row);
std::string line = "";
size_t index = 0;
unsigned table_r = 0;
bool was_empty = false;
while (is_on || (was_empty == false)) {
{
std::lock_guard lock(m);
if (tryGetData(table_stream, line, row_sz, table_r) == 0 && line.empty() == false) {
table_r += row_sz;
was_empty = false;
table_row row;
index = 0;
row.fromBinaryString(line, index, line.size());
if (tryGetData(data_stream, line, row.len, row.off) == 0 && line.empty() == false) {
was_empty = false;
index = 0;
student tmp;
tmp.fromBinaryString(line, index);
if (VAL_CHECK == 1) s2.push_back(tmp);
}
else was_empty = true;
}
else was_empty = true;
}
}

}
table_stream.close();
data_stream.close();
}

Try get data function:

int tryGetData(std::fstream &data_stream, std::string& data, size_t data_sz, size_t offset) {
int ret = 0;
data = "";
if (data_stream.is_open()) {
//Set ptr
if (offset != UINT32_MAX) data_stream.seekp(offset);
char c;
while (data_sz > 0 && data_stream.get(c)) {
data.push_back(c);
data_sz--;
}
if (data_stream.eof()) ret = 1;
}
return ret;
}

我能想到的最简单的解决方案是使用双缓冲。每种文件类型有两个,并确保输入线程写入一对文件,而输出线程总是读取另一对文件。

我能想到的所有其他解决方案都需要确保操作系统或文件库没有对文件进行任何缓存,因此将是特定于平台的。但是,如果这不是问题,请读取内存映射文件,例如。

主要问题是,写线程可能比读线程晚开始。如果读线程通过写线程等待打开文件,那么它工作得很好。

最新更新