使用C++对一个巨大文件中的文本行进行字典式外部排序



我们有一个200Gb的文件,文件中填充了除以"\n"的文本行。我们的服务器上有Linux,gcc,8GB RAM和无限的硬盘空间。从我们的角度来看,我们的要求是用C++实现对该文件的行进行字典排序的最有效方法。

我已经正确地对输入文件(大小高达2GB(的文本行进行了字典式的外部排序。但是当我用大于2GB的文件进行测试时,输出是不正确的。需要测试大于200GB的最大文件大小。

下面是我的代码,它在文件大小低于2GB的情况下运行良好。程序有3个参数作为命令行参数:以字节为单位的输入文件名、输出文件名和内存限制(我们将使用不同的内存限制进行测试,而不仅仅是8GB(程序应能够正确处理简单的边界情况,例如小文件,空文件,大于200GB的文件。它应该能很好地与非ascii符号配合使用,但是我们可以假设零字节不存在于文件中。我们还可以假设内存限制远大于最长行的大小。

这是代码。请帮我指出文件大小大于2GB时无法产生正确结果的原因。如果你能在下面给我一个代码的修改版本,使其在所有情况下都能工作,我将不胜感激。

#include <stdio.h>
#include <stdlib.h>
#include <fstream>
#include <sstream>
#include <iostream> 
#include <algorithm>
#include <vector>
#include <string>
#include <iterator>
//using namespace std;
#define FILESIZE  200000000000          // size of the file on disk
#define TOTAL_MEM 7000000000            // max items the memory buffer can hold
typedef std::vector<std::string> DataContainer;

// Function to sort lines of text according to their length
/*bool compare(std::string &a, std::string &b) 
{ 
if (a.size() < b.size())
return true;
else if (a.size() > b.size())
return false;
else
return a < b;
//return strcasecmp( a.c_str(), b.c_str() ) <= 0;
} 
long estimateSizeOfBlocks(long sizeoffile, int maxtmpfiles, long maxMemory) 
{
}
*/
std::string ToString(int val) {
std::stringstream stream;
stream << val;
return stream.str();
}
void ExternalSort(std::string infilepath, std::string outfilepath)
{
std::string line, new_line;
std::string tfile = "temp-file-";
int i, j, k;
DataContainer v;
std::ifstream infile;
if(!infile.is_open())
{   
std::cout << "Unable to open filen";           
}
infile.open(infilepath, std::ifstream::in);
// Check if input file is empty
if (infile.peek() == std::ifstream::traits_type::eof())
{
std::cout << "File is empty";
}
//std::vector<std::string> x(std::istream_iterator<std::string>(infile), {});
// Get size of the file
infile.seekg (0, infile.end);
int input_file_size = infile.tellg();
infile.seekg (0, infile.beg);
std::cout << "The size of the file chosen is (in bytes): " << input_file_size << "n";
// Estimate required number of runs to read all data in the input file
int runs_count;
if(input_file_size % TOTAL_MEM > 0)
runs_count = input_file_size/TOTAL_MEM + 1;     // if file size is fixed at 200GB, we just need to define FILESIZE: runs_count = FILESIZE/TOTAL_MEM
else
runs_count = input_file_size/TOTAL_MEM;
// Iterate through the elements in the file
for(i = 0; i < runs_count; i++)
{
// Step 1: Read M-element chunk at a time from the file
for (j = 0; j < (TOTAL_MEM < input_file_size ? TOTAL_MEM : input_file_size); j++)
{
while(std::getline(infile, line))
{
// If line is empty, ignore it
if(line.empty())
continue;
new_line = line + "n";
// Line contains string of length > 0 then save it in vector
if(new_line.size() > 0)
v.push_back(new_line);
}   
}
// Step 2: Sort M elements
sort(v.begin(), v.end());       //sort(v.begin(), v.end(), compare); 
// Step 3: Create temporary files and write sorted data into those files.
std::ofstream tf;
tf.open(tfile + ToString(i) + ".txt", std::ofstream::out | std::ofstream::app); 
std::ostream_iterator<std::string> output_iterator(tf, "n");
std::copy(v.begin(), v.end(), output_iterator);
/*  for(vector<std::string>::const_iterator it = v.begin(); it != v.end(); ++it)
tf << *it << "n";
*/
tf.close();
}   
infile.close();
// Step 4: Now open each file and merge them, then write back to disk
DataContainer vn;
for(i = 0; i < runs_count; i++)     
{
std::ifstream sortedFiles[runs_count];                              
std::ostringstream filename;
filename << tfile << i << ".txt";
sortedFiles[i].open(filename.str(), std::ifstream::in);
//sortedFiles[i].open(tfile + ToString(i) + ".txt", std::ifstream::in); 
std::string st, new_st;
while(std::getline(sortedFiles[i], st))
{
// If line is empty, ignore it
if(st.empty())
continue;
new_st = st + "n";
if(new_st.size() > 0)
vn.push_back(new_st);
}
sortedFiles[i].close();
}
// Step 5: Merge total sorted data
sort(vn.begin(), vn.end());
std::ofstream outfile;
outfile.open(outfilepath, std::ofstream::out | std::ofstream::app);     
std::ostream_iterator<std::string> output_iterator(outfile, "n");
std::copy(vn.begin(), vn.end(), output_iterator);
outfile.close();    // Close final sorted file
}
int main(int argc, char** argv)         //      e.g.     argv[1] = "E:\data.txt"           argv[2] = "E:\sorted_data.txt"
{
std::cout << "You have entered " << argc 
<< " arguments:n"; 
for (int i = 0; i < argc; ++i) 
std::cout << argv[i] << "n"; 
ExternalSort(argv[1], argv[2]);
return 0; 
}

请参阅下面的编辑,了解外部排序的完整实现-无保修。

假设您有两个升序排序的文件(每个数字都是一行(:

  • 文件A:1 3 5 7
  • 文件B:10 2 4 6 8
  • 预期结果是文件C:1 10 2 3 4 5 6 7 8

如果您简单地连接文件,就像您所做的那样,您将得到:1 3 5 7 10 2 4 6 8;这不是你所期望的。

您应该从这两个文件中读取并编写词典中最小的一行。对于包含相同行数和交替行的文件(例如,1 3 52 4 6(,可以很容易地理解这一点:

  1. 从A和B中读取第一行
  2. 把最小的写为C
  3. 从写入的编号所属的文件中读取下一行
  4. 转到2

如果行数不同,一旦到达一个文件的末尾,写入第二个文件的剩余部分

File A: 1 3 5 7
File B: 2 4 8
A B    C
--------
1 2 -> 1
3 2 -> 2
3 4 -> 3
5 4 -> 4
5 8 -> 5
7 8 -> 7
8 -> 8

一旦您有了合并两个文件的算法,就会在一对文件上运行它,直到您只得到一个文件为止。假设您已对文件A、B、C&D:

  1. 合并A&B转换为AB
  2. 删除A&B
  3. 合并C&D转换成CD
  4. 删除C&D
  5. 合并AB&CD放入ABCD这就是结果
  6. 删除AB&CD.

    A   B  C   D
    |   |  |   |
     /     /
    AB     CD
    |      | 
         /
    ABCD
    

这是一个匆忙编写的双向合并——无担保:

#include <iostream>
#include <iostream>
#include <fstream>
#include <string>
#include <algorithm>
class two_way_merge_t
{
public:
two_way_merge_t(std::istream& a, std::istream& b, std::ostream& c) : a_{ a }, b_{ b }, c_{ c }
{
// nop
}
void execute()
{
using namespace std;
// get first line from a; assumes a is not empty
a_.get();
// get first line from b; assumes b is not empty
b_.get();
// write whichever is less and advance
while (write_one())
;
}
protected:
// helper
struct reader_t
{
std::istream& is_;
std::string line_;
bool get()
{
return (bool)std::getline(is_, line_);
}
void write(std::ostream& os)
{
os << line_ << std::endl;
}
void write_rest(std::ostream& os)
{
do write(os); while (get());
}
};
reader_t a_, b_;
std::ostream& c_;
bool write_one()
{
using namespace std;
if (lexicographical_compare(a_.line_.begin(), a_.line_.end(), b_.line_.begin(), b_.line_.end()))
{
a_.write(c_);
if (!a_.get())
return b_.write_rest(c_), false;
}
else
{
b_.write(c_);
if (!b_.get())
return a_.write_rest(c_), false;
}
return true;
}
};
void validate(std::istream& is) // lines count not validated
{
using namespace std;
string prev;
string cur;
while (getline(is, cur))
{
if (!lexicographical_compare(prev.begin(), prev.end(), cur.begin(), cur.end()))
throw "bad two-way merge";
swap(prev, cur);
}
}
int main()
{
using namespace std;
{
ifstream a("c:\temp\a.txt");
if (!a)
return -1;
ifstream b("c:\temp\b.txt");
if (!b)
return -2;
ofstream c("c:\temp\c.txt");
if (!c)
return -3;
two_way_merge_t twm(a, b, c);
twm.execute();
}
ifstream c("c:\temp\c.txt");
if (!c)
return -4;
try
{
validate(c);
}
catch (const char* e)
{
cout << e;
return -5;
}
}

E D I T

#include <iostream>
#include <fstream>
#include <string>
#include <algorithm>
#include <vector>
#include <filesystem>
//-------------------------------------------------------------------
class two_way_merge_2_t
{
public:
two_way_merge_2_t(std::vector<std::string>& a, std::istream& b, std::ostream& c) : a_{ a }, b_{ b }, c_{ c }
{
// nop
}
void execute()
{
using namespace std;
// get first line from a; assumes a is not empty
if ( ! a_.get() )
return b_.write_rest(c_);
// get first line from b; assumes a is not empty
if (!b_.get())
return a_.write_rest(c_);
// we have managed to read from a and b: write whichever is less and advance
while (write_one())
;
}
protected:
// helper
struct isreader_t
{
std::istream& is_;
std::string line_;
bool get()
{
return (bool)std::getline(is_, line_);
}
void write(std::ostream& os)
{
os << line_ << std::endl;
}
void write_rest(std::ostream& os)
{
do write(os); while (get());
}
const std::string& line() const
{
return line_;
}
};
// helper
struct vreader_t
{
std::vector<std::string>& v_;
std::vector<std::string>::size_type i_ = std::vector<std::string>::size_type (-1);
bool get()
{
return ++i_ < v_.size();
}
void write(std::ostream& os)
{
os << v_[i_] << std::endl;
}
void write_rest(std::ostream& os)
{
do write(os); while (get());
}
const std::string& line() const
{
return v_[i_];
}
};
vreader_t a_;
isreader_t b_;
std::ostream& c_;
bool write_one()
{
using namespace std;
if (lexicographical_compare(a_.line().begin(), a_.line().end(), b_.line().begin(), b_.line().end()))
{
a_.write(c_);
if (!a_.get())
return b_.write_rest(c_), false;
}
else
{
b_.write(c_);
if (!b_.get())
return a_.write_rest(c_), false;
}
return true;
}
};

//-------------------------------------------------------------------
class partial_sort_t
{
public:
partial_sort_t(std::istream& a, std::istream& b, std::ostream& c, std::size_t mem)
: a_{ a }
, b_{ b }
, c_{ c }
, mem_{ mem }
{
// nop
}
void execute()
{
sort();
two_way_merge_2_t(va_, b_, c_).execute();
}
protected:
std::istream& a_;
std::istream& b_;
std::ostream& c_;
const std::size_t mem_;
std::vector<std::string> va_;
void sort()
{
std::streamsize max_gpos = a_.tellg() + std::streamsize(mem_);
std::string line;
va_.clear();
while (a_.tellg() < max_gpos && std::getline(a_, line) )
va_.push_back(std::move(line));
std::sort(va_.begin(), va_.end());
}
};

//-------------------------------------------------------------------
class external_sort_t
{
public:
external_sort_t(const char* fn, const std::size_t mem) : is_{ fn }, mem_{ mem }
{
// nop
}
const char* execute()
{
make_b();
make_c();
while (is_)
{
partial_sort_t(is_, b_, c_, mem_).execute();
swap_bc();
}
return done();
}
protected:
const std::size_t mem_;
std::ifstream is_;
std::ifstream b_;
std::string bpath_;
std::ofstream c_;
std::string cpath_;
static std::string make_temp_file()
{
std::string fn(512, 0);
tmpnam_s(&fn.front(), fn.size());
std::ofstream os(fn);
os.close();
return fn;
}
void make_b()
{
b_.close();
bpath_ = make_temp_file();
b_.open(bpath_);
}
void make_c()
{
c_.close();
cpath_ = make_temp_file();
c_.open(cpath_);
}
void swap_bc()
{
b_.close();
c_.close();
std::swap(bpath_, cpath_);
b_.open(bpath_);
c_.open(cpath_);
}
const char* done()
{
b_.close();
c_.close();
std::experimental::filesystem::remove(cpath_);
return bpath_.c_str();
}
};
void validate(std::istream& is) // lines count not validated
{
using namespace std;
string prev;
string cur;
while (getline(is, cur))
{
if ( strcmp(prev.c_str(), cur.c_str()) > 0 )
throw "bad two-way merge";
swap(prev, cur);
}
}
void validate(const char* fn)
{
using namespace std;
ifstream is{ fn };
if (!is)
throw "cannot open";
validate(is);
}
void test_external_sort()
{
using namespace std;
external_sort_t es{ "c:\temp\a.txt", 10 };
std::string rfn = es.execute();
try
{
validate(rfn.c_str());
}
catch (const char* e)
{
cout << e << endl;
}
}
int main()
{
test_external_sort();
}

相关内容

  • 没有找到相关文章

最新更新