我必须使用 AWS SES 发送大量电子邮件(例如每个作业 10000 封)。找到了一个关于如何并行执行此操作的很棒的博客,现在有一个关于如何将发送事务数据写入数据库的问题。 我正在使用 npoco ORM 和 InsertBulk,从我的粗略看,它通过遍历每个 poco 来打开连接并插入,然后关闭连接。除了每次发送的打开、写入和关闭之外,这是一个进步。 我在这里的想法是将数据库操作保持在最低限度,但我应该每发送 50 封左右的电子邮件就写给数据库,以防服务器或作业中断,作业以后可以从中断的地方继续,而不会发送重复项等。
所以我开始使用ConcurrentBag,线程锁定,转换为列表,将该列表发送到npoco插入等。 测试非常有限,它可以工作。但我确信这不是正确的方法,而且我没有信心在这里正确使用线程。此方案中有哪些建议?将并发包传递给 npoco 进行插入(其他插入方法)会更好或可行吗?
var bag = new ConcurrentBag<EmailSent>();
Parallel.ForEach(recipients.AsParallel(), new ParallelOptions { MaxDegreeOfParallelism = maxParallelEmails },
recipient =>
{
var response = client.SendEmail(request);
bag.Add(new EmailSent() { JobId = jobId, MessageId = response.MessageId});
}
lock (syncRoot)
{
count++;
if (count % 50 == 0 || count == recipients.Count)
{
var list = new List<EmailSent>();
while (!bag.IsEmpty)
{
EmailSent email;
if (bag.TryTake(out email))
{
list.Add(email);
}
}
repo.InsertBulk<EmailSent>(list);
}
});
如果您只是在寻找优化,一种是对插入使用表值参数,以便将多个记录发送到存储过程,而不是为每个插入调用一次。
在 SQL 服务器上,定义参数类型,这看起来很像定义表。(大多数示例来自上面的链接。
CREATE TYPE dbo.CategoryTableType AS TABLE
( CategoryID int, CategoryName nvarchar(50) )
然后,将该类型的参数添加到插入过程中:
CREATE PROCEDURE usp_UpdateCategories
(@tvpNewCategories dbo.CategoryTableType READONLY)
在存储过程中,可以从该参数中进行选择,就像选择表变量一样。
INSERT INTO dbo.Categories (CategoryID, CategoryName)
SELECT nc.CategoryID, nc.CategoryName FROM @tvpNewCategories AS nc;
这样做的好处是,您可以将所有插入作为单个操作执行。
在应用程序端,您将创建一个与您定义的表类型相对应的数据表。然后,使用要插入的记录填充表。
最后,在调用该过程时,添加一个参数,并将 DataTable 作为其值,指定SqlDbType = SqlDbType.Structured
,"TypeName"是表类型的名称。
SqlParameter tvpParam = insertCommand.Parameters.AddWithValue(
"@tvpNewCategories", yourDataTable);
tvpParam.SqlDbType = SqlDbType.Structured;
tvpParam.TypeName = "dbo.CategoryTableType";
如果您在 2008 年之前使用 SQL Server,您就会知道我们做了一些奇怪的事情来将多个记录传递给过程,例如连接字符串或发送和分析 XML。这要容易得多,并且大大减少了单个存储过程调用的数量。
使用 ConcurrentQueue
代替 ConcurrentBag
。您不必担心在添加到队列或取出项目时锁定任何内容。如果要以 n 为批次保存记录,您可以连续TryDeqeueue
并将取消排队的项目添加到集合中,直到集合计数为 n 或TryDequeue
返回 false
,这意味着队列中没有剩余内容。