我知道如何使用std::string和std::unordered_set来实现这一点,但是,集合中的每一行和每一个元素都占用了大量不必要的低效内存,导致一个unordered.set和文件中一半的行比文件本身大5-10倍。
例如,是否有可能(如果有)以某种方式减少内存消耗,以便您可以使用不超过20 GB的RAM从10 GB的文件中删除重复项?当然,在这种情况下,有必要以O(n)的速度进行。
您可以使用每行的哈希快速找到重复的行,如其他答案所示。但是,如果您只存储散列,那么就假设没有散列冲突。如果你使用std::hash
,那就不是真的了。如果你使用一个好的加密散列,你可能会逃脱惩罚。
由于您的输入只有10G,我建议采用不同的方法。好吧,除了琐碎的事。10G可能只是加载到内存中,并将每一行作为字符串存储在现代系统中。
但让我们节省一些内存:
- 首先,您应该对文件进行mmap,这样它的所有数据都可以从C++访问,而不会同时加载到内存中
- 创建
std::unordered_multimap<std::size_t, std::string_view> lines;
以跟踪输入中已看到的行 - 在输入文件上循环并为每行文本创建一个string_view,计算哈希并在
lines
中查找。如果存在散列,则将该行与具有相同散列的其他行进行比较。如果该行是唯一的,则将其添加到lines
并输出
我认为每(唯一)行将使用32字节的内存。因此,对于短线,所需的内存可能比输入文件还多。另一方面,对于短线,可能会有很多不那么独特的线。
附言:你可以通过只存储每一行的开头来节省内存。如果你估计了(唯一的)行的数量,你可以使用一个具有不同冲突策略的哈希表(没有bin)将其降低到每行8字节。
当然,您可以在O(n^2)时间内完成这项工作,只需使用容纳两行所需的内存量、一个布尔标志和两个文件偏移量。
基本算法是:
- 打开输入文件
- 打开输出文件
- 将标志设置为false
- 将预读偏移量设置为0
- 更多输入:
- 读取第一个输入行
- 将当前输入文件偏移量保存为读取后偏移量
- 将输入文件查找到偏移量0
- 当当前输入文件偏移量小于预读偏移量时:
- 读取第二个输入行
- 如果第一输入线等于第二输入线:
- 将标志设置为true
- Break
- 如果标志为false:
- 将第一个输入行写入输出文件
- 将标志设置为false
- 查找要发布读取偏移的输入文件
- 将读取前偏移设置为读取后偏移
当然,这是非常低效的时间,但它的内存效率与您所能获得的一样高。
可能的C++实现:
std::ifstream input(inputFilePath);
std::ofstream output(outputFilePath);
std::streamoff offset_before = 0;
std::streamoff offset_after = 0;
bool found_dupe = false;
std::string line1;
while (std::getline(input, line1)) {
offset_after = input.tellg();
input.seekg(0);
std::string line2;
while (input.tellg() < offset_before && std::getline(input, line2)) {
if (line1 == line2) {
found_dupe = true;
break;
}
}
if (!found_dupe) {
output << line1 << 'n';
}
found_dupe = false;
input.seekg(offset_after);
offset_before = offset_after;
}
实时演示
我知道现在回答这个问题有点晚了,但为了好玩,我写了一个实现,我认为它非常节省内存,同时仍然具有合理的性能。
特别是,此解决方案在O(N*log(N))
时间内运行,(在我的机器上)仅使用360 KB的堆内存,同时对包含99990000个随机排列的重复行的100000000行(5 GB)文本文件进行去重处理,并在6分32秒内完成。
当然,它确实有点欺骗,因为它向磁盘写入了一个临时索引文件(索引包含输入文件中每一行的哈希值,以及该行在输入文件中的位置)。索引文件每行文本需要16个字节,所以在我的测试中它的大小约为1.4GB。
为了进行重复数据消除,程序mmap()
将索引文件放入RAM,按哈希码对其内容进行排序,然后扫描索引,并使引用输入文件中相同字符串的具有相同哈希码的任何现在相邻的条目无效。
之后,按字节偏移量对索引文件进行重新排序,然后再次遍历索引以生成消除重复的输出文件。
我的测试运行(在2018年英特尔Mac Mini上)的输出如下:
Jeremys-Mac-mini-2:~ jaf$ time ./a.out
Step 1: Read input file [big_input_file.txt] and write index file [index_file.tmp]
Step 2: mmap() index file [index_file.tmp] into RAM and sort its entries by hash-code
Step 3: Iterate through index file [index_file.tmp] and invalidate any duplicate entries
Step 4: Re-sort the index file [index_file.tmp] by byte-offset in preparation for generating output
Step 5: Write output file [big_output_file.txt]
Step 6: Delete index file and exit
Final result: Out of 100000001 lines read, 99990000 duplicate lines detected and removed.
real 6m32.800s
user 3m39.215s
sys 2m41.448s
源代码如下(我用g++ -std=c++20 -O3 ./dedup_file.cpp
编译):
#include <fcntl.h>
#include <stdint.h>
#include <unistd.h>
#include <sys/mman.h>
#include <array>
#include <fstream>
#include <iostream>
#include <span>
#include <string>
using SizeTPair = std::array<size_t, 2>;
static const SizeTPair INVALID_INDEX_ENTRY = {(std::size_t)-1, (std::size_t)-1}; // special value meaning "not a valid index entry"
// Given a pointer into the contents of the input file, returns a string_view representing
// the line of text starting there. (This is necessary since we can't modify the input file
// and the lines in the input file are not NUL-terminated)
static std::string_view GetStringAtOffset(const char * inputFileMap, size_t offset)
{
if (offset == (size_t)-1) return "";
const char * s = &inputFileMap[offset];
const char * nl = strchr(s, 'n');
return nl ? std::string_view(s, nl-s) : std::string_view(s);
}
// Comparison functor to sort SizeTPairs by the text they point to
// breaks ties by sorting by line-number (so that if a duplicate line is
// detected in the text, it will always be the second instance of that line that
// is excluded from our program's output, not the first instance)
class SortIndicesByStringCompareFunctor
{
public:
SortIndicesByStringCompareFunctor(const char * inputFileMap) : _inputFileMap(inputFileMap) {/* empty */}
bool operator()(const SizeTPair & a, const SizeTPair & b) const
{
const std::string_view textA = GetStringAtOffset(_inputFileMap, a[0]);
const std::string_view textB = GetStringAtOffset(_inputFileMap, b[0]);
if (textA != textB) return (textA < textB);
return (a[1] < b[1]); // sub-sort by line number
}
private:
const char * _inputFileMap;
};
static void WriteEntryToIndexFile(std::ofstream & indexFile, const SizeTPair & entry, size_t & indexSizeItems)
{
indexFile.write(reinterpret_cast<const char *>(&entry), 2*sizeof(size_t));
indexSizeItems++;
}
int main(int, char **)
{
const char * bigInputFileName = "big_input_file.txt";
const char * indexFileName = "index_file.tmp";
const char * bigOutputFileName = "big_output_file.txt";
std::cout << "Step 1: Read input file [" << bigInputFileName << "] and write index file [" << indexFileName << "]" << std::endl;
// Step 1. Read through the big input-text file, and generate a binary
// index-file containing (for each line) that line's hash-code and also
// its location in the input file
size_t indexSizeItems = 0;
size_t inputFileSizeBytes = 0;
{
std::ifstream inputFile;
inputFile.open(bigInputFileName, std::ios_base::binary | std::ios_base::ate); // binary only so we can get valid file-offsets out of tellg()
inputFileSizeBytes = inputFile.tellg(); // get file size
inputFile.seekg(0, std::ios_base::beg); // then go back to the beginning of the file so we can read it
std::ofstream indexFile;
indexFile.open(indexFileName, std::ios_base::binary);
std::string nextLine;
while(inputFile.good())
{
const std::streampos curFileOffset = inputFile.tellg(); // do this before reading the line: record our current read-offset into the file
std::getline(inputFile, nextLine);
WriteEntryToIndexFile(indexFile, {std::hash<std::string>()(nextLine), (std::size_t)curFileOffset}, indexSizeItems);
}
// Add a final dummy-entry to the end of the index, just to force the flushing of any
// final text-line(s) in our for-loop in step (3)
WriteEntryToIndexFile(indexFile, INVALID_INDEX_ENTRY, indexSizeItems);
}
std::cout << "Step 2: mmap() index file [" << indexFileName << "] into RAM and sort its entries by hash-code" << std::endl;
// Step 2. mmap() the index-file we just generated, and sort its contents by hash-code (sub-sort by byte-offset)
const int indexFD = open(indexFileName, O_RDWR, (mode_t)0666);
if (indexFD < 0) {std::cerr << "Couldn't open() index file!" << std::endl; exit(10);}
char * indexFileMap = (char *) mmap(0, indexSizeItems*(2*sizeof(size_t)), PROT_READ | PROT_WRITE, MAP_SHARED, indexFD, 0);
if (indexFileMap == MAP_FAILED) {std::cerr << "mmap() of index file failed!" << std::endl; exit(10);}
SizeTPair * index = reinterpret_cast<SizeTPair *>(indexFileMap);
std::span<SizeTPair> indexSpan(index, index+indexSizeItems);
std::sort(std::begin(indexSpan), std::end(indexSpan));
std::cout << "Step 3: Iterate through index file [" << indexFileName << "] and invalidate any duplicate entries" << std::endl;
// Step 3. Go through the index file and invalidate any duplicate
// entries (i.e. any entries that have the same hash code and same
// underlying string as a previous entry)
const int inputFD = open(bigInputFileName, O_RDONLY, (mode_t)0666);
if (inputFD < 0) {std::cerr << "Couldn't open() input file!" << std::endl; exit(10);}
const char * inputFileMap = (const char *) mmap(0, inputFileSizeBytes, PROT_READ, MAP_SHARED, inputFD, 0);
if (indexFileMap == MAP_FAILED) {std::cerr << "mmap() of index file failed!" << std::endl; exit(10);}
size_t dupesRemoved = 0;
ssize_t runStartIdx = -1;
for (size_t i=0; i<indexSizeItems; i++)
{
SizeTPair & curEntry = index[i];
// swap to put the line number in [0] and the hash in [1], since in the future
// we will want to sort by line number and this will make it easier to do that.
std::swap(curEntry[0], curEntry[1]);
const size_t curByteOffset = curEntry[0];
const size_t curHash = curEntry[1];
if (runStartIdx >= 0)
{
if (curHash != index[runStartIdx][1])
{
// A run of identical hashes started at (runStartIdx) and ended just before (i)
if ((i-runStartIdx)>1)
{
// Re-sort the index-entries-with-identical-hashes by the strings they represent
// so that we can find and remove any duplicate-strings easily. (We have to do this
// because the same hash could, at least in principle, be associted with two different strings)
std::span<SizeTPair> duplicateHashesSpan(index+runStartIdx, index+i);
std::sort(std::begin(duplicateHashesSpan), std::end(duplicateHashesSpan), SortIndicesByStringCompareFunctor(inputFileMap));
std::string_view previousEntryTextLine;
for (size_t j=runStartIdx; j<i; j++)
{
const std::string_view curEntryTextLine = GetStringAtOffset(inputFileMap, index[j][0]);
if (curEntryTextLine == previousEntryTextLine)
{
index[j] = INVALID_INDEX_ENTRY;
dupesRemoved++;
}
previousEntryTextLine = curEntryTextLine;
}
}
runStartIdx = i;
}
}
else runStartIdx = i;
}
std::cout << "Step 4: Re-sort the index file [" << indexFileName << "] by byte-offset in preparation for generating output" << std::endl;
// Step 4. Re-sort the index file by byte-offset (note that each line's byte-offset is stored
// as the first entry in its SizeTPair now!)
std::sort(std::begin(indexSpan), std::end(indexSpan));
std::cout << "Step 5: Write output file [" << bigOutputFileName << "]" << std::endl;
// Step 5. Read through the big text file one more time, and
// write out only those lines that still exist in the index file
std::ofstream outputFile;
outputFile.open(bigOutputFileName);
for (size_t i=0; i<indexSizeItems; i++)
{
const SizeTPair & curEntry = index[i];
if (curEntry == INVALID_INDEX_ENTRY) break; // these will all have been sorted to the end so we can stop now
else outputFile << GetStringAtOffset(inputFileMap, curEntry[0]) << std::endl;
}
outputFile.close();
// Step 6. Clean up our mess and exit
std::cout << "Step 6: Delete index file and exit" << std::endl;
close(inputFD);
close(indexFD);
remove(indexFileName);
std::cout << "Final result: Out of " << (indexSizeItems-1) << " lines read, " << dupesRemoved << " duplicate lines detected and removed. " << std::endl;
return 0;
}
此代码逐行读取输入文件,只在内存中存储字符串的哈希值。如果以前没有看到该行,它会将结果写入输出文件。如果以前看到过这条线,它就没有任何作用。
它使用稀疏化来减少内存占用。
输入数据:
- 12 GB文件大小
- ~197.0.00000不同线路
- 线路长度<120个字符
构建:
- C++20
- 发布版本
- 不在Visual Studio中运行(未附加调试器)
处理:
- AMD Ryzen 2700X
- 32 GB RAM
- 希捷SSD
- 190秒
- 954 MB虚拟内存峰值
这足够好吗?我不能说,因为你的性能要求很模糊,而且你没有给出合适的性能比较数据。这可能取决于你的机器、你的数据、你的RAM大小、你的硬盘速度。。。
#include <chrono>
#include <iostream>
#include <fstream>
#include <algorithm>
#include <array>
#include <cstring>
#include <functional>
#include <random>
#include <string>
#include "spp.h"
int main()
{
std::chrono::steady_clock::time_point begin = std::chrono::steady_clock::now();
std::ifstream input;
std::ofstream output;
input.open("10gb.txt");
output.open("10gb_nodupes.txt");
std::string inputline;
spp::sparse_hash_map<size_t, size_t> hashes;
while (std::getline(input, inputline))
{
std::size_t hash = std::hash<std::string>{}(inputline);
if (!hashes.contains(hash))
{
output << inputline << 'n';
hashes[hash]=0;
}
}
input.close();
output.close();
std::chrono::steady_clock::time_point end = std::chrono::steady_clock::now();
std::cout << "Time difference = " << std::chrono::duration_cast<std::chrono::seconds>(end - begin).count() << "[s]" << std::endl;
std::cout << "Done";
}