使用 protobuf 流式压缩 IDataReader



我们需要大幅减少后端服务在将数据拉取和推送到 sql 时使用的带宽。SqlClient使用的TDS流相当臃肿。多年来,人们在从sql中提取时一直要求压缩选项,但Microsoft没有添加它。

我想看看是否有人对处理这个问题的最佳方法有任何想法。这是我到目前为止尝试过的:

  1. 我修改了 https://github.com/MindFlavor/TDSBridge 以在套接字层添加压缩和解压缩。由于有效负载是SSL加密的,因此没有太大区别。

  2. 接下来,我将 IDataReader 带到 Protobuf 库找到:https://github.com/dotarj/protobuf-net-data 和 TCP 框架,在 https://github.com/jchristn/WatsonTcp 中找到,试图创建一个客户端服务器代理,通过将 IDataReader 转换为 protobuf,然后压缩此流,并在另一端执行相反的操作,通过网络流式传输 IDataReader。

我在这里得到了一个概念验证,实际上与纯 TDS 流相比,网络上的原始字节减少了 84% 到 98%。缺点是 WatsonTcp 希望您在分配流时传入内容长度。但是在你创建整个protobuf流之前,没有办法知道这一点。我们有时会一举转移数百场演出,所以这是行不通的。

我不明白protobuf-net-data如何通过grpc进行流式传输,即使可以,我担心IAsyncEnumerable中记录的粒度性质可能会减慢大型传输的速度。

毫无疑问,我可以坐下来通过TCP流实现编写一个完全自定义的原始套接字压缩protobuf,客户端的表面积接近SqlCommand,我只知道这是众所周知的难以正确处理。

有什么节省时间的想法吗?如果没有,也许我会用它做一个开源项目。

下面是可用于获取大型查询并将其作为一系列批处理传输的模式,其中每个批处理都是压缩的二进制序列化数据表。 在传输和反序列化之后,每个数据表都可以由 SqlBulk Copy 直接使用。 相同的模式可以与其他格式一起使用,但在传递给 SqlBulkCopy 之前需要额外的转换器。

using System.Data.SqlClient;
using System;
using System.Collections;
using System.Collections.Generic;
using System.Data;
using System.IO;
using System.Runtime.Serialization.Formatters.Binary;
namespace BatchingDataReader
{
class BatchingDataReader : IDataReader
{
private int batchSize;
private IDataReader rdr;
private int rowsRead;
private bool atEnd;
private int batchesRead;
public BatchingDataReader(IDataReader rdr, int batchSize)
{
this.batchSize = batchSize;
this.rdr = rdr;
}
public object this[int i] => rdr[i];
public object this[string name] => rdr[name];
public int Depth => rdr.Depth;
public bool IsClosed => rdr.IsClosed;
public int RecordsAffected => rdr.RecordsAffected;
public int FieldCount => rdr.FieldCount;
public void Close()
{
if (!atEnd)
return;
rdr.Close();
}
public void Dispose()
{
if (!atEnd)
return;
rdr.Dispose();
}
public bool GetBoolean(int i)
{
return rdr.GetBoolean(i);
}
public byte GetByte(int i)
{
return rdr.GetByte(i);
}
public long GetBytes(int i, long fieldOffset, byte[] buffer, int bufferoffset, int length)
{
return rdr.GetBytes(i, fieldOffset, buffer, bufferoffset, length);
}
public char GetChar(int i)
{
return rdr.GetChar(i);
}
public long GetChars(int i, long fieldoffset, char[] buffer, int bufferoffset, int length)
{
return rdr.GetChars(i, fieldoffset, buffer, bufferoffset, length);
}
public IDataReader GetData(int i)
{
return rdr.GetData(i);
}
public string GetDataTypeName(int i)
{
return rdr.GetDataTypeName(i);
}
public DateTime GetDateTime(int i)
{
return rdr.GetDateTime(i);
}
public decimal GetDecimal(int i)
{
return rdr.GetDecimal(i);
}
public double GetDouble(int i)
{
return rdr.GetDouble(i);
}
public Type GetFieldType(int i)
{
return rdr.GetFieldType(i);
}
public float GetFloat(int i)
{
return rdr.GetFloat(i);
}
public Guid GetGuid(int i)
{
return rdr.GetGuid(i);
}
public short GetInt16(int i)
{
return rdr.GetInt16(i);
}
public int GetInt32(int i)
{
return rdr.GetInt32(i);
}
public long GetInt64(int i)
{
return rdr.GetInt64(i);
}
public string GetName(int i)
{
return rdr.GetName(i);
}
public int GetOrdinal(string name)
{
return rdr.GetOrdinal(name);
}
public DataTable GetSchemaTable()
{
return rdr.GetSchemaTable();
}
public string GetString(int i)
{
return rdr.GetString(i);
}
public object GetValue(int i)
{
return rdr.GetValue(i);
}
public int GetValues(object[] values)
{
return rdr.GetValues(values);
}
public bool IsDBNull(int i)
{
return rdr.IsDBNull(i);
}
public bool NextResult()
{
if (!atEnd)
{
batchesRead += 1;
rowsRead = 0;
return true;
}
if (IsClosed)
return false;
return rdr.NextResult();
}
public bool Read()
{
if (rowsRead >= batchSize)
return false;
rowsRead += 1;
atEnd = !rdr.Read();
return !atEnd;
}
public static IEnumerable<DataTable> Read(SqlDataReader r, int batchSize)
{
var rdr = new BatchingDataReader(r, batchSize);
do
{
var dt = new DataTable();
dt.TableName = "table";
dt.Load(rdr);
yield return dt;
} while (rdr.NextResult());
}
}
class Program
{
static void Main(string[] args)
{
var constr = "server=localhost;database=master;integrated security=true";
var outfile = "c:\temp\out.bin";
if (File.Exists(outfile))
File.Delete(outfile);
using (var con = new SqlConnection(constr))
{
//322,162,200  TDS raw
//235,355,311  binary uncompressed out.bin
// 52,755,181  binary GZ Fastest
// 43,061,121  binary GZ optimal
// 65,282,624  XML GZ fastest
// 41,892,056  binary GZ optimal 100,000 row batches
con.Open();
var bin = new BinaryFormatter();
var cmd = new SqlCommand("select top (1000000) * from sys.messages m, sys.objects o", con);
using (SqlDataReader rdr = cmd.ExecuteReader())
using (var destFile = File.OpenWrite(outfile))
using (var zipStream = new System.IO.Compression.GZipStream(destFile,System.IO.Compression.CompressionLevel.Optimal))
{
foreach (var dt in BatchingDataReader.Read(rdr, 10000))
{
Console.WriteLine(dt.Rows.Count);
dt.RemotingFormat = SerializationFormat.Binary;
bin.Serialize(zipStream, dt);
}
}
}
}
}
}

您可以使用此技术让 SQL Server 将结果格式化为 gzip 的 csv(调整组中每结果行数的依据 - 1000 大约是 gzip 开销减少的地方(:

with csv as (
select n = row_number() over (order by (select null)),
line = convert(nvarchar(max), concat(
message_id, ',', language_id, ',', severity, ',',
is_event_logged, ',', '"' + replace([text], '"', '""') + '"'))
from sys.messages)
select compress(string_agg(line, char(13)) within group (order by n))
from csv group by n / 1000

..如果您在 SQL Server 上遇到实际的出口瓶颈,这应该会有所帮助。将其实现为 TDSBridge 会很有趣,该 TDSBridge 重写查询,然后将结果转换回客户端的期望。

最新更新