CsvHelper-异步读取流



我有一个服务,它接受一个包含CSV数据的输入流,该数据需要大容量插入数据库,我的应用程序尽可能使用async/await。

过程是:使用CsvHelper的CsvParser分析流,将每一行添加到DataTable,使用SqlBulkCopy将DataTable复制到数据库。

数据可以是任何大小,所以我希望避免一次将整个数据读入内存——很明显,到最后我会将所有数据都放在DataTable中,所以内存中基本上有两个副本。

我想尽可能异步地完成所有这些,但CsvHelper没有任何异步方法,所以我想出了以下解决方法:

using (var inputStreamReader = new StreamReader(inputStream))
{
    while (!inputStreamReader.EndOfStream)
    {
        // Read line from the input stream
        string line = await inputStreamReader.ReadLineAsync();
        using (var memoryStream = new MemoryStream())
        using (var streamWriter = new StreamWriter(memoryStream))
        using (var memoryStreamReader = new StreamReader(memoryStream))
        using (var csvParser = new CsvParser(memoryStreamReader))
        {
            await streamWriter.WriteLineAsync(line);
            await streamWriter.FlushAsync();
            memoryStream.Position = 0;
            // Loop through all the rows (should only be one as we only read a single line...)
            while (true)
            {
                var row = csvParser.Read();
                // No more rows to process
                if (row == null)
                {
                    break;
                }
                // Add row to DataTable
            }
        }
    }
}

这个解决方案有什么问题吗?有必要吗?我已经看到CsvHelper开发人员特别没有添加异步功能(https://github.com/JoshClose/CsvHelper/issues/202)但我并没有真正理解为什么不这么做。

编辑:我刚刚意识到,这个解决方案无论如何都不适用于列包含换行符的情况:(我想我只需要将整个输入流复制到MemoryStream或其他中

编辑2:更多信息。

这是在一个库中的异步方法中,我正试图在该方法中一直执行异步操作。它可能会被MVC控制器占用(如果我只是想从UI线程中卸载它,我只需要Task.Run()它)。大多数情况下,该方法将等待外部源,如数据库/DFS,我希望线程在空闲时被释放。

即使阻止的是读取流(例如,如果我试图读取的数据位于世界另一端的服务器上),CsvParser.Read()也会被阻止,而如果CsvHelper要实现一个使用TextReader.ReadAsync()的异步方法,那么我就不会被阻止等待数据从迪拜到达。据我所知,我并不是要求在同步方法周围使用异步包装器。

第三版:来自遥远未来的更新!异步功能实际上早在2017年就添加到了CsvHelper中。我希望我工作的公司有人从那时起升级到了新版本!

Eric lippert用在餐厅做饭的比喻解释了异步等待的有用性。根据他的解释,如果你的线程没有其他事情可做,异步地做某事是没有用的

另外,要注意,当你的线程正在做某事时,它不能做其他事情。只有当你的线程在等待某件事时,它才能做其他事情。在您的过程中等待的事情之一是读取文件。当线程逐行读取文件时,它必须等待几次才能读取行。在等待期间,它可以做其他事情,比如解析读取的CSV数据,并将解析后的数据发送到目的地。

分析数据不是一个线程必须等待其他进程完成的过程,就像读取文件或向数据库发送数据时一样。这就是为什么没有异步版本的解析过程。正常的异步等待无助于让线程保持忙碌,因为在解析过程中没有什么可等待的,所以在解析期间,线程没有时间做其他事情。

当然,您可以使用task.Run(()=>ParseReadData(…))将解析过程转换为一个不可用的任务,然后等待该任务完成,但与Eric Lippert的餐厅类似,这将是解冻厨师来完成这项工作,而您坐在柜台后面什么都不做。

但是,如果您的线程在解析读取的CSV数据时有一些有意义的事情要做,比如响应用户输入,那么在单独的任务中开始解析可能会很有用。

如果你的完整读取-解析-更新数据库进程不需要与用户交互,但你需要你的线程在执行该进程时可以自由地做其他事情,可以考虑将整个进程放在一个单独的任务中,并在不等待的情况下启动该任务。在这种情况下,你只使用接口线程来启动另一个任务,并且您的接口线程可以自由地做其他事情。与流程的总时间相比,启动此新任务的成本相对较小。

再一次:如果你的线程没有其他事情可做,让这个线程来处理,不要启动其他任务来处理。

这是一篇关于在同步方法上公开异步包装器的好文章,以及为什么CsvHelper没有做到这一点。http://blogs.msdn.com/b/pfxteam/archive/2012/03/24/10287244.aspx

如果您不想阻塞UI线程,请在后台线程上运行处理。

CsvHelper拉入一个数据缓冲区。缓冲区的大小是一个设置,如果您愿意,可以进行更改。如果你的服务器在世界的另一端,它会缓冲一些数据,然后读取。很可能,在使用缓冲区之前需要多次读取。

CsvHelper也会产生记录,所以如果您实际上没有得到一行,则不会读取任何内容。如果只读取几行,则只读取文件的那部分(实际上是缓冲区大小)。

如果你担心记忆力,有几个简单的选择。

  1. 缓冲数据。您可以一次大容量复制100或1000行,而不是整个文件。只要继续这样做,直到文件完成
  2. 使用FileStream。如果出于某种原因需要一次读取整个文件,请使用FileStream并将整个文件写入光盘。它会更慢,但你不会使用大量内存

相关内容

  • 没有找到相关文章

最新更新