如何使用反应式扩展检测大文件中唯一的行



我必须处理大型CSV文件(高达数十GB(,如下所示:

Key,CompletedA,CompletedB
1,true,NULL
2,true,NULL
3,false,NULL
1,NULL,true
2,NULL,true  

我有一个解析器,它将解析后的行生成为IEnumerable<Record>,这样我一次只读取一行到内存中。

现在,我必须按Key对记录进行分组,并检查列CompletedA和CompletedB在组中是否有值。在输出中,我需要组中没有CompletedA和CompletedB的记录。

在这种情况下,它是用键3记录的。

然而,在同一个数据集上有许多类似的处理,我不习惯多次迭代。

我想我可以将IEnumerable转换为IOobservable,并使用反应扩展来查找记录。

在IObservable集合上使用简单的Linq表达式,是否可以以内存高效的方式实现这一点?

如果Key是整数,我们可以尝试使用Dictionary和一次扫描:

// value: 0b00 - neither A nor B
//        0b01 - A only
//        0b10 - B only
//        0b11 - Both A and B    
Dictionary<int, byte> Status = new Dictionary<int, byte>();
var query = File
.ReadLines(@"c:MyFile.csv")
.Where(line => !string.IsNullOrWhiteSpace(line))
.Skip(1) // skip header 
.Select(line => YourParserHere(line));
foreach (var record in query) {
int mask = (record.CompletedA != null ? 1 : 0) |
(record.CompletedB != null ? 2 : 0); 
if (Status.TryGetValue(record.Key, out var value))
Status[record.Key] = (byte) (value | mask);
else
Status.Add(record.Key, (byte) mask);
}
// All keys that don't have 3 == 0b11 value (both A and B)  
var bothAandB = Status
.Where(pair => pair.Value != 3)
.Select(pair => pair.Key); 

我认为这将满足您的需求:

var result =
source
.GroupBy(x => x.Key)
.SelectMany(xs =>
(xs.Select(x => x.CompletedA).Any(x => x != null && x == true) && xs.Select(x => x.CompletedA).Any(x => x != null && x == true))
? new List<Record>()
: xs.ToList());

使用Rx在这里没有帮助。

是的,Rx库非常适合这种同步枚举一次/计算多次操作。您可以使用Subject<Record>作为一对多传播算子,然后您应该将各种Rx操作符附加到它上,然后您可以向它提供源枚举器中的记录,最后您将从附加的操作符中收集结果,这些操作符现在将完成。以下是基本模式:

IEnumerable<Record> source = GetRecords();
var subject = new Subject<Record>();
var task1 = SomeRxTransformation1(subject);
var task2 = SomeRxTransformation2(subject);
var task3 = SomeRxTransformation3(subject);
source.ToObservable().Subscribe(subject); // This line does all the work
var result1 = task1.Result;
var result2 = task2.Result;
var result3 = task3.Result;

SomeRxTransformation1SomeRxTransformation2等是接受IObservable<Record>并返回一些通用Task的方法。他们的签名应该是这样的:

Task<TResult> SomeRxTransformation1(IObservable<Record> source);

例如,您想要进行的特殊分组需要进行如下转换:

Task<Record[][]> GroupByKeyExcludingSomeGroups(IObservable<Record> source)
{
return source
.GroupBy(record => record.Key)
.Select(grouped => grouped.ToArray())
.Merge()
.Where(array => array.All(r => !r.CompletedA && !r.CompletedB))
.ToArray()
.ToTask();
}

当你把它合并到模式中时,它会看起来像这样:

Task<Record[][]> task1 = GroupByKeyExcludingSomeGroups(subject);
source.ToObservable().Subscribe(subject); // This line does all the work
Record[][] result1 = task1.Result;

最新更新