使用BlockingCollection将文件复制到另一个文件.目标不同于源,有时是垃圾.我忽略了我的代码中的任何东西



编辑&更新- 我已经尝试了相同的代码现在在我的个人电脑上,它的工作非常好。我能够使用相同的代码复制任何类型的文件,没有任何问题。当我在工作计算机上运行代码时,我遇到了这个问题。我只是不明白为什么这取决于电脑。请让我知道我是否在这里遗漏了什么。

在readTask中,我依次读取文件并将字节添加到BlockingCollection中。在消费任务中,我正在读取数据,因为它出现在BlockingCollection中并将其写入文件。因为,默认情况下,BlockingCollection是ConcurrentQueue上的包装器,我希望从阻塞队列中读取的顺序与向其写入的顺序相同。但是当我比较目标文件和源文件时,它是完全不同的,有时我看到重复。
我的源文件只是一个数字序列,每个数字在新的一行,如下所示。

1
2
3
4
5
6
7
8
9
10

在我的文件我有大约5000个数字的文件有足够的大小。这个代码有什么问题吗?或者这不是阻塞收集应该工作的方式。在这个例子中,我正在写入一个文件,但在现实中,我需要将这个文件推送到一个Rest API,数据按顺序发送是很重要的。如果字节不能按顺序发送,则文件存储在服务器上时会损坏。

static void Main(string[] args)
    {
        BlockingCollection<byte[]> bc = new BlockingCollection<byte[]>(10);
        Task consumeTask = Task.Factory.StartNew(() =>
        {
            var fs = File.OpenWrite(@"C:Temppass_new.txt");
            foreach (byte[] data in bc.GetConsumingEnumerable())
            {
                fs.Write(data, 0, data.Length);
            }
            fs.Close();
        });
        Task readTask = Task.Factory.StartNew(() =>
        {
            var fs = File.OpenRead(@"C:Temppass.txt");
            var bufferSize = 4096;
            var buffer = new byte[bufferSize];
            int bytesRead = 0;
            while ((bytesRead = fs.Read(buffer, 0, buffer.Length)) != 0)
            {
                byte[] dataToWrite = buffer;
                if (bytesRead < bufferSize)
                {
                    dataToWrite = new byte[bytesRead];
                    Array.Copy(buffer, dataToWrite, bytesRead);
                }
                bc.Add(dataToWrite);
            }
            fs.Close();
        }).ContinueWith(ant => bc.CompleteAdding());
        consumeTask.Wait();
    }

我认为这是因为当bytesRead == bufferSize。想象一下这两个序列(我在这里松散地使用"指针"一词来指代引用变量,我认为它能更好地表达意思)。

首先,当你小于缓冲区大小时:

  1. buffer指向内存中4096大小的新字节数组。
  2. fs.Readbuffer指向的对象写入20字节。
  3. dataToWrite指向与buffer相同的对象
  4. dataToWrite指向一个大小为20字节的新字节数组。
  5. 从对象buffer点到对象dataToWrite点,复制20字节。
  6. 在阻塞集合中放置一个指向dataToWrite所指向的对象的指针。
  7. fs.Readbuffer指向的对象写入30字节。

现在将其与满足缓冲区大小的情况进行比较。

  1. buffer指向内存中4096大小的新字节数组。
  2. fs.Readbuffer所指向的对象写入4096字节。
  3. dataToWrite指向与缓冲区相同的对象。
  4. 在阻塞集合中放置一个指向dataToWrite所指向对象的指针。
  5. fs.Readbuffer所指向的对象写入30字节。

由于dataToWrite, buffer和添加到阻塞集合中的项都指向同一个对象,所以最后一个fs.Read将修改刚刚存储在集合中的字节数组。

删除你的if语句,总是分配一个新的dataToWrite,你的程序应该工作得很好。

我会尝试Take()(而不是。getconsumingenumerable ())
GetConsumingEnumerable应该仍然有效
即使它是默认的我也会使用ConcurrentConcurrentQueue

参见文档

BlockingCollection。采取方法

项被删除的顺序取决于对象的类型集合,用于创建BlockingCollection实例。当你创建一个BlockingCollection对象,您可以指定的类型使用的集合。例如,您可以指定一个ConcurrentConcurrentQueue对象用于先进先出(FIFO)行为,或ConcurrentStack对象的后进先出(LIFO)行为。

对于重复项。我用过很多次,从来不知道BlockingCollection会产生重复。我使用它为数据库提供数据,其中副本将违反约束。

相关内容

  • 没有找到相关文章

最新更新