如何在最短的时间内插入 1000 万条记录



我有一个文件(有1000万条记录(,如下所示:

    line1
    line2
    line3
    line4
   .......
    ......
    10 million lines

所以基本上我想在数据库中插入 1000 万条记录。所以我读取了文件并将其上传到SQL Server。

C# 代码

System.IO.StreamReader file = 
    new System.IO.StreamReader(@"c:test.txt");
while((line = file.ReadLine()) != null)
{
    // insertion code goes here
    //DAL.ExecuteSql("insert into table1 values("+line+")");
}
file.Close();

但是插入需要很长时间。如何使用 C# 在尽可能短的时间内插入 1000 万条记录?

更新 1:
批量插入:

BULK INSERT DBNAME.dbo.DATAs
FROM 'F:dt10000000dt10000000.txt'
WITH
(
     ROWTERMINATOR =' n'
  );

我的表格如下所示:

DATAs
(
     DatasField VARCHAR(MAX)
)

但我收到以下错误:

Msg 4866,级别 16,状态 1,第 1
行 批量加载失败。数据文件中的列对于第 1 行第 1 列来说太长。验证是否正确指定了字段终止符和行终止符。

Msg 7399,级别 16,状态 1,第 1
行 链接服务器"(空("的 OLE DB 提供程序"BULK"报告了错误。提供程序未提供有关错误的任何信息。

Msg 7330,级别 16,状态 2,第 1
行 无法从链接服务器"(null("的 OLE DB 提供程序"BULK"中获取行。

下面的代码有效:

BULK INSERT DBNAME.dbo.DATAs
FROM 'F:dt10000000dt10000000.txt'
WITH
(
    FIELDTERMINATOR = 't',
    ROWTERMINATOR = 'n'
);

请不要创建要通过 BulkCopy 加载的DataTable对于较小的数据集来说,这是一个不错的解决方案,但在调用数据库之前,绝对没有理由将所有 1000 万行加载到内存中。

最好的选择(在 BCP/BULK INSERT/OPENROWSET(BULK...) 之外(是通过表值参数 (TVP( 将文件的内容流式传输到数据库中。 通过使用 TVP,您可以打开文件,读取一行并发送一行直到完成,然后关闭文件。此方法的内存占用量仅为一行。 我写了一篇文章,从应用程序将数据流式传输到SQL Server 2008,其中有一个场景的示例。

结构

的简单概述如下。我假设导入表和字段名称与上述问题所示相同。

必需的数据库对象:

-- First: You need a User-Defined Table Type
CREATE TYPE ImportStructure AS TABLE (Field VARCHAR(MAX));
GO
-- Second: Use the UDTT as an input param to an import proc.
--         Hence "Tabled-Valued Parameter" (TVP)
CREATE PROCEDURE dbo.ImportData (
   @ImportTable    dbo.ImportStructure READONLY
)
AS
SET NOCOUNT ON;
-- maybe clear out the table first?
TRUNCATE TABLE dbo.DATAs;
INSERT INTO dbo.DATAs (DatasField)
    SELECT  Field
    FROM    @ImportTable;
GO

下面提供了使用上述 SQL 对象的 C# 应用代码。请注意,在此方法中,不是填充对象(例如 DataTable(然后执行存储过程,而是执行存储过程来启动文件内容的读取。存储过程的输入参数不是变量;它是一个方法的返回值,GetFileContents .当SqlCommand调用 ExecuteNonQuery 时调用该方法,这将打开文件,读取一行并通过IEnumerable<SqlDataRecord>yield return构造将该行发送到 SQL Server,然后关闭该文件。 存储过程只看到一个表变量,@ImportTable,一旦数据开始传入,就可以访问该变量(注意:数据确实会在 tempdb 中保留很短的时间,即使不是完整的内容也是如此(。

using System.Collections;
using System.Data;
using System.Data.SqlClient;
using System.IO;
using Microsoft.SqlServer.Server;
private static IEnumerable<SqlDataRecord> GetFileContents()
{
   SqlMetaData[] _TvpSchema = new SqlMetaData[] {
      new SqlMetaData("Field", SqlDbType.VarChar, SqlMetaData.Max)
   };
   SqlDataRecord _DataRecord = new SqlDataRecord(_TvpSchema);
   StreamReader _FileReader = null;
   try
   {
      _FileReader = new StreamReader("{filePath}");
      // read a row, send a row
      while (!_FileReader.EndOfStream)
      {
         // You shouldn't need to call "_DataRecord = new SqlDataRecord" as
         // SQL Server already received the row when "yield return" was called.
         // Unlike BCP and BULK INSERT, you have the option here to create a string
         // call ReadLine() into the string, do manipulation(s) / validation(s) on
         // the string, then pass that string into SetString() or discard if invalid.
         _DataRecord.SetString(0, _FileReader.ReadLine());
         yield return _DataRecord;
      }
   }
   finally
   {
      _FileReader.Close();
   }
}

上面的GetFileContents方法用作存储过程的输入参数值,如下所示:

public static void test()
{
   SqlConnection _Connection = new SqlConnection("{connection string}");
   SqlCommand _Command = new SqlCommand("ImportData", _Connection);
   _Command.CommandType = CommandType.StoredProcedure;
   SqlParameter _TVParam = new SqlParameter();
   _TVParam.ParameterName = "@ImportTable";
   _TVParam.TypeName = "dbo.ImportStructure";
   _TVParam.SqlDbType = SqlDbType.Structured;
   _TVParam.Value = GetFileContents(); // return value of the method is streamed data
   _Command.Parameters.Add(_TVParam);
   try
   {
      _Connection.Open();
      _Command.ExecuteNonQuery();
   }
   finally
   {
      _Connection.Close();
   }
   return;
}

附加说明:

  1. 通过一些修改,上面的 C# 代码可以适应批处理数据。
  2. 只需稍作修改,上面的 C# 代码就可以适应在多个字段中发送(如"Steaming Data..."中显示的示例(。上面链接的文章在 2 个字段中传递(。
  3. 您还可以在 proc 的 SELECT 语句中操作每条记录的值。
  4. 您还可以通过在过程中使用 WHERE 条件筛选出行。
  5. 您可以多次访问 TVP 表变量;它是只读的,但不是"仅转发"。
  6. SqlBulkCopy相比的优势:
    1. SqlBulkCopy是仅插入的,而使用 TVP 允许以任何方式使用数据:您可以调用MERGE;您可以根据某些条件DELETE;您可以将数据拆分为多个表;等等。
    2. 由于 TVP 不是仅插入的,因此不需要单独的临时表将数据转储到其中。
    3. 您可以通过调用 ExecuteReader 而不是 ExecuteNonQuery 从数据库中获取数据。例如,如果导入DATAs表上有一个IDENTITY字段,则可以向INSERT添加一个OUTPUT子句以传递回INSERTED.[ID](假设IDIDENTITY字段的名称(。或者,您可以传回完全不同的查询结果,或者两者兼而有之,因为可以通过Reader.NextResult()发送和访问多个结果集。使用SqlBulkCopy时不可能从数据库中获取信息,但是在S.O.上有几个问题,人们想要这样做(至少在新创建的IDENTITY值方面(。
    4. 有关为什么整个过程有时更快(即使将数据从磁盘导入 SQL Server 的速度稍慢(的详细信息,请参阅 SQL Server 客户咨询团队的此白皮书:使用 TVP 最大化吞吐量

在 C# 中,最好的解决方案是让SqlBulkCopy读取文件。为此,您需要将IDataReader直接传递给SqlBulkCopy.WriteToServer方法。下面是一个例子:http://www.codeproject.com/Articles/228332/IDataReader-implementation-plus-SqlBulkCopy

最好的方法是混合使用第一个解决方案和第二个解决方案,创建DataTable并在循环中添加行,然后使用BulkCopy上传到一个连接中的数据库 在大容量复制中使用它获得帮助

需要注意的另一件事是,批量复制是一个非常敏感的操作,几乎每个错误都会使副本无效,例如,如果您在 dataTable 中将列名声明为"text",而在数据库中将其声明为"text",它将抛出异常,祝你好运。

如果要在最短的时间内插入 1000 万条记录以使用 SQL 查询进行直接测试,则应使用此查询

 CREATE TABLE TestData(ID INT IDENTITY (1,1), CreatedDate DATETIME)
 GO
 INSERT INTO TestData(CreatedDate) SELECT GetDate()
 GO 10000000

最新更新