编辑&更新- 我已经尝试了相同的代码现在在我的个人电脑上,它的工作非常好。我能够使用相同的代码复制任何类型的文件,没有任何问题。当我在工作计算机上运行代码时,我遇到了这个问题。我只是不明白为什么这取决于电脑。请让我知道我是否在这里遗漏了什么。
在readTask中,我依次读取文件并将字节添加到BlockingCollection中。在消费任务中,我正在读取数据,因为它出现在BlockingCollection中并将其写入文件。因为,默认情况下,BlockingCollection是ConcurrentQueue上的包装器,我希望从阻塞队列中读取的顺序与向其写入的顺序相同。但是当我比较目标文件和源文件时,它是完全不同的,有时我看到重复。
我的源文件只是一个数字序列,每个数字在新的一行,如下所示。
1
2
3
4
5
6
7
8
9
10
在我的文件我有大约5000个数字的文件有足够的大小。这个代码有什么问题吗?或者这不是阻塞收集应该工作的方式。在这个例子中,我正在写入一个文件,但在现实中,我需要将这个文件推送到一个Rest API,数据按顺序发送是很重要的。如果字节不能按顺序发送,则文件存储在服务器上时会损坏。
static void Main(string[] args)
{
BlockingCollection<byte[]> bc = new BlockingCollection<byte[]>(10);
Task consumeTask = Task.Factory.StartNew(() =>
{
var fs = File.OpenWrite(@"C:Temppass_new.txt");
foreach (byte[] data in bc.GetConsumingEnumerable())
{
fs.Write(data, 0, data.Length);
}
fs.Close();
});
Task readTask = Task.Factory.StartNew(() =>
{
var fs = File.OpenRead(@"C:Temppass.txt");
var bufferSize = 4096;
var buffer = new byte[bufferSize];
int bytesRead = 0;
while ((bytesRead = fs.Read(buffer, 0, buffer.Length)) != 0)
{
byte[] dataToWrite = buffer;
if (bytesRead < bufferSize)
{
dataToWrite = new byte[bytesRead];
Array.Copy(buffer, dataToWrite, bytesRead);
}
bc.Add(dataToWrite);
}
fs.Close();
}).ContinueWith(ant => bc.CompleteAdding());
consumeTask.Wait();
}
我认为这是因为当bytesRead == bufferSize
。想象一下这两个序列(我在这里松散地使用"指针"一词来指代引用变量,我认为它能更好地表达意思)。
首先,当你小于缓冲区大小时:
- 让
buffer
指向内存中4096大小的新字节数组。 -
fs.Read
向buffer
指向的对象写入20字节。 - 让
dataToWrite
指向与buffer相同的对象 - 让
dataToWrite
指向一个大小为20字节的新字节数组。 - 从对象
buffer
点到对象dataToWrite
点,复制20字节。 - 在阻塞集合中放置一个指向
dataToWrite
所指向的对象的指针。 -
fs.Read
向buffer
指向的对象写入30字节。
现在将其与满足缓冲区大小的情况进行比较。
- 让
buffer
指向内存中4096大小的新字节数组。 -
fs.Read
向buffer
所指向的对象写入4096字节。 - 让
dataToWrite
指向与缓冲区相同的对象。 - 在阻塞集合中放置一个指向
dataToWrite
所指向对象的指针。 -
fs.Read
向buffer
所指向的对象写入30字节。
由于dataToWrite
, buffer
和添加到阻塞集合中的项都指向同一个对象,所以最后一个fs.Read
将修改刚刚存储在集合中的字节数组。
删除你的if语句,总是分配一个新的dataToWrite
,你的程序应该工作得很好。
我会尝试Take()(而不是。getconsumingenumerable ())
GetConsumingEnumerable应该仍然有效
即使它是默认的我也会使用ConcurrentConcurrentQueue
参见文档
BlockingCollection。采取方法
项被删除的顺序取决于对象的类型集合,用于创建BlockingCollection实例。当你创建一个BlockingCollection对象,您可以指定的类型使用的集合。例如,您可以指定一个ConcurrentConcurrentQueue对象用于先进先出(FIFO)行为,或ConcurrentStack对象的后进先出(LIFO)行为。
对于重复项。我用过很多次,从来不知道BlockingCollection会产生重复。我使用它为数据库提供数据,其中副本将违反约束。