SqlBulkCopy WriteToServer具有IDataReader而不是DataTable和编程调整的字段值



我们在C#中有一个工作代码,它利用SqlBulkCopy将记录从存储过程源插入表中。高层:

  1. 从将记录放入DataTable的存储过程中读取数据。本质上调用SP并执行AdpAdapter将记录放入DataTable。让我们将其称为srcDataTable
  2. 通过配置在源和目标之间动态映射列名,该表类似于以下表:
ColumnInDestination空空生日出生日期1900年1月1日年月日
TargetTableName ColumnFromSource默认值格式化
表格A StudentFirstName 名字
表格A StudentLastName 姓氏
表格A

您似乎在问"我可以创建自己的类来实现IDataReader并对Read()方法进行一些更改的逻辑吗">

答案是肯定的;你可以在Read()中编写自己的数据读取器,它可以做任何它喜欢的事情,甚至可以在调用时立即格式化服务器的硬盘。。当你实现一个接口时;扩展DataReader的读取方法";,您提供的是自己的实现,在外部看起来遵守特定的契约,但实现的细节完全取决于您。如果您想在每次读取时将dbX中的一行下拉到临时数组中,请在数组中快速调整值以进行一些默认或其他调整,然后返回true,这很好。。

如果你想在GetXXX中进行值调整,那也没关系。。你在给读者写信,所以你自己决定。大容量复印机要做的就是调用Read,直到它返回false,然后写入它从例如GetValue中获得的数据(如果还不清楚:读取不会产生将要写入的数据,那么GetValue会。读取只是一条移动到下一组必须写入的数据的指令,但它甚至不必这样做。你可以将其实现为{ return DateTime.Now.DayOfWeek == DayOfWeek.Monday; },将GetValue实现为{ return Guid.NewGuid().ToString(); },你的复制操作将花费到23:59:59.999,用guid填充数据库,但只在周一)

这个问题有点不清楚。看起来实际的问题是,在将SqlBulkCopy与数据读取器一起使用之前,是否可以转换数据。

有很多方法可以做到这一点,而合适的方法取决于ETL代码的其余部分是如何做到的。它只适用于数据读取器吗?或者它会加载一批可以在内存中修改的行吗?

使用IEnumerable<gt;和ObjectReader

FastMember的ObjectReader类在任何IEnumerable<T>集合上创建一个IDataReader包装器。这意味着强类型.NET集合迭代器结果都可以发送到SqlBulkCopy。

IEnumerable<string> lines=File.ReadLines(filePath);
using(var bcp = new SqlBulkCopy(connection)) 
using(var reader = ObjectReader.Create(lines, "FileName")) 
{ 
bcp.DestinationTableName = "SomeTable"; 
bcp.WriteToServer(reader); 
}

通过这种方式,可以使用LINQ查询和迭代器方法创建转换管道,并使用ObjectReader将结果提供给SqlBulkCopy。该代码比尝试创建自定义IDataReader简单很多。

在本例中,Dapper可用于以IEnumerable<>:的形式返回查询结果

IEnumerable<Order> orders=connection.Query<Order>("select ... where category=@category",
new {category="Cars"});
var ordersWithDate=orders.Select(ord=>new OrderWithDate {
....
SaleDate=DateTime.Parse(ord.DateString,CultureInfo.GetCultureInfo("en-GB");
});
using var reader = ObjectReader.Create(ordersWithDate, "Id","SaleDate",...));

自定义转换数据读取器

还可以通过实现IDataReader接口来创建自定义数据读取器。像ExcelDataReader和CsvHelper这样的库为它们的结果提供了这样的包装器。CsvHelper的CsvDataReader在解析的CSV结果上创建一个IDataReader包装器。这样做的缺点是IDataReader有很多方法要实现。必须实现GetSchemaTable,以便为以后的转换步骤和SqlBulkCopy提供列和信息。

IDataReader可能是动态的,但它需要添加大量手工编码的类型信息才能工作。在CsvDataReader中,大多数方法只将调用转发到底层CsvReader,例如:

public long GetInt64(int i)
{
return csv.GetField<long>(i);
}
public string GetName(int i)
{
return csv.Configuration.HasHeaderRecord
? csv.HeaderRecord[i]
: string.Empty;
}

GetSchemaTable()是70行,默认值不是最佳值。例如,当解析器已经可以解析日期和数字数据时,为什么要使用sting作为列类型?

解决这一问题的一种方法是使用以前阅读器的模式表的副本创建一个新的自定义IDataReader,并添加额外的列。CsvDataReader的构造函数接受一个DataTable schemaTable参数来处理它自己的GetSchemaTable不够好的情况。可以修改DataTable以添加额外的列:

/// <param name="csv">The CSV.</param>
/// <param name="schemaTable">The DataTable representing the file schema.</param>
public CsvDataReader(CsvReader csv, DataTable schemaTable = null)
{
this.csv = csv;
csv.Read();
if (csv.Configuration.HasHeaderRecord)
{
csv.ReadHeader();
}
else
{
skipNextRead = true;
}
this.schemaTable = schemaTable ?? GetSchemaTable();
}

可以创建一个DerivedColumnReader,它在构造函数中就是这样做的

public DerivedColumnReader<TSource,TResult>(string sourceName, string targetname,Fun<TSource,TResult> func,DataTable schemaTable)
{
...
AddSchemaColumn(schemaTable);
_schemaTable=schemaTable;
}
void AddSchemaColumn(DataTable dt,string targetName)
{
var row = dt.NewRow();
row["AllowDBNull"] = true;
row["BaseColumnName"] = targetName;
row["ColumnName"] = targetName;
row["ColumnMapping"] = MappingType.Element;              
row["ColumnOrdinal"] = dt.Rows.Count+1;
row["DataType"] = typeof(TResult);
//20-30 more properties
dt.Rows.Add(row);
}

LINQ消除了很多锅炉板。

只是提供了它的闭包。因此,真正的主要问题是,在存储过程中不使用FETCH和OFFSET的情况下,如何避免从SQL中获取数据时遇到内存不足的异常。该解决方案不需要使用类似于SqlDataReader的自定义阅读器,而是添加计数检查并批量调用SqlBulkCopy。代码类似于下面所写的内容:

using (var dataReader = sqlCmd.ExecuteReader(CommandBehavior.SequentialAccess))
{
int rowCount = 0;
while (dataReader.Read())
{
DataRow dataRow = SourceDataSet.Tables[source.ObjectName].NewRow();
for (int i = 0; i < SourceDataSet.Tables[source.ObjectName].Columns.Count; i++)
{
dataRow[(SourceDataSet.Tables[source.ObjectName].Columns[i])] = dataReader[i];
}
SourceDataSet.Tables[source.ObjectName].Rows.Add(dataRow);
rowCount++;
if (rowCount % recordLimitPerBatch == 0)
{
// Apply our field mapping
ApplyFieldMapping();
// Write it up
WriteRecordsIntoDestinationSQLObject();
// Remove from our dataset once we get to this point
SourceDataSet.Tables[source.ObjectName].Rows.Clear();
}
}
}

其中ApplyFieldMapping()对数据表的内容进行字段特定的更改,WriteRecordsIntoDestinationSqlObject()。这允许我们只调用一次存储过程来获取数据,并让循环通过写入记录并在达到预设记录LimitPerBatch后清除记录来控制内存。

相关内容

  • 没有找到相关文章

最新更新