如果需要以流式方式从SqlServer读取数据,则有一些功能。例如使用SqlDataReader
和CommandBehavior.SequentialAccess
,特别是当需要访问二进制列数据时,有GetStream(int)
方法:
var cmd = new SqlCommand();
cmd.Connection = connection;
cmd.CommandText = @"select 0x0123456789 as Data";
using (var dr = cmd.ExecuteReader(CommandBehavior.SequentialAccess))
{
dr.Read();
var stream = dr.GetStream(0);
// access stream
}
但是,当需要使用SqlBulkCopy
将数据馈送到SqlServer时,尤其是当需要将流作为二进制列的数据源时,反方向的流式数据又如何呢?
我试着跟随
var cmd2 = new SqlCommand();
cmd2.Connection = connection;
cmd2.CommandText = @"create table #Test (ID int, Data varbinary(max))";
cmd2.ExecuteNonQuery();
using (SqlBulkCopy sbc = new SqlBulkCopy(connection, SqlBulkCopyOptions.TableLock, null))
{
sbc.DestinationTableName = "#Test";
sbc.EnableStreaming = true;
sbc.ColumnMappings.Add(0, "ID");
sbc.ColumnMappings.Add(1, "Data");
sbc.WriteToServer(new TestDataReader());
}
其中TestDataReader
实现IDataReader
如下:
class TestDataReader : IDataReader
{
public int FieldCount { get { return 2; } }
int rowCount = 1;
public bool Read() { return (rowCount++) < 3; }
public bool IsDBNull(int i) { return false; }
public object GetValue(int i)
{
switch (i)
{
case 0: return rowCount;
case 1: return new byte[] { 0x01, 0x23, 0x45, 0x67, 0x89 };
default: throw new Exception();
}
}
//the rest members of IDataReader
}
它如预期的那样发挥了作用。
但是的变化
case 1: return new byte[] { 0x01, 0x23, 0x45, 0x67, 0x89 };
至
case 1: return new MemoryStream(new byte[] { 0x01, 0x23, 0x45, 0x67, 0x89 });
导致消息出现异常System.InvalidOperationException
数据源中MemoryStream类型的给定值不能为已转换为指定目标列的varbinary类型。
有没有一种方法可以将Stream
从IDataReader
(或者可能是DbDataReader
)提供给SqlBulkCopy
,作为二进制列的数据源,而无需首先将其所有数据复制到内存(字节数组)中?
不确定这是否有文档记录,但如果对SqlBulkCopy
源代码进行简短检查,您可能会发现它以不同的方式处理不同的数据读取器。首先,SqlBulkCopy
确实支持流式传输和GetStream
,但您可能会注意到IDataReader
接口不包含GetStream
方法。因此,当您将自定义IDataReader
实现提供给SqlBulkCopy
时,它不会将二进制列视为流式,也不会接受Stream
类型的值。
另一方面,DbDataReader
确实有这种方法。如果为SqlBulkCopy
提供DbDataReader
的实例(继承类),它将以流式处理所有二进制列,并调用DbDataReader.GetStream
。
因此,为了解决您的问题,从DbDataReader
继承如下:
class TestDataReader : DbDataReader
{
public override bool IsDBNull(int ordinal) {
return false;
}
public override int FieldCount { get; } = 2;
int rowCount = 1;
public override bool HasRows { get; } = true;
public override bool IsClosed { get; } = false;
public override bool Read()
{
return (rowCount++) < 3;
}
public override object GetValue(int ordinal) {
switch (ordinal) {
// do not return anything for binary column here - it will not be called
case 0:
return rowCount;
default:
throw new Exception();
}
}
public override Stream GetStream(int ordinal) {
// instead - return your stream here
if (ordinal == 1)
return new MemoryStream(new byte[] {0x01, 0x23, 0x45, 0x67, 0x89});
throw new Exception();
}
// bunch of irrelevant stuff
}
请参阅以下代码
static int SendOrders(int totalToSend)
{
using (SqlConnection con = new SqlConnection(connectionString))
{
con.Open();
using (SqlTransaction tran = con.BeginTransaction())
{
var newOrders =
from i in Enumerable.Range(0, totalToSend)
select new Order
{
customer_name = "Customer " + i % 100,
quantity = i % 9,
order_id = i,
order_entry_date = DateTime.Now
};
SqlBulkCopy bc = new SqlBulkCopy(con,
SqlBulkCopyOptions.CheckConstraints |
SqlBulkCopyOptions.FireTriggers |
SqlBulkCopyOptions.KeepNulls, tran);
bc.BatchSize = 1000;
bc.DestinationTableName = "order_queue";
bc.WriteToServer(newOrders.AsDataReader());
tran.Commit();
}
con.Close();
}
return totalToSend;
}