我有一个多线程应用程序,它在队列中循环并获取数据,并将这些数据发送到存储过程中,将其插入到我的表中。问题是偶尔会在完全相同的时间插入这些数据,这会导致插入重复的行。现在这些行确实有主键作为id,但是,所有其他列都是完全相同的数据。
这是我的循环,最多可以产生20个线程。
var task = new Task();
foreach(job in jobList)
{
task = Task.Run(() => ProcessJobs(job));
}
Task.WaitAll(task);
每个线程读取自己单独的队列,然后我处理每条消息并将其添加到HashSet中,以确保没有重复的
private async Task<string> ProcessJobs(Job job)
{
var messageData = getMessageFromQueue(message);
HashSet<UserInfo> list = new HashSet<UserInfo>();
foreach(var message in messageData)
{
list.Add(BuildMessage(message));
}
InsertIntoDB(list);
}
public HashSet<UserInfo> BuildMessage(MessageData messageData)
{
return new UserInfo
{
UserName = messageData.UserName,
Address = messageData.Address,
AccountType = messageData.Campaign?.AccountType == "G" ? "Type1" :"Type2",
AccountNumber = messageData.AccountList != null ? messageData.AccountList[0].ToString() : string.Empty.
}
}
public struct UserInfo
{
public string UserName { get; set; }
public string Address { get; set; }
public string AccountType { get; set; }
public string AccountNumber { get; set; }
}
每个消息都被处理并作为表值参数发送到数据库,以插入语句
public async Task<int> InsertIntoDB(HashSet<UserInfo> list)
{
// First convert the hashset to a dataTable
var dataTable = list.ToDatatable();
// Convert to a TVP
var params = new DynamicParameters();
parameters.Add("@TVP_UserInfo", dataTable.AsTableValuedParameter("[dbo].[InsertUserInfo]"));
using (var conn = new SqlConnection(ConfigurationManager.AppSettings["DatabaseConnection"]))
{
result = await conn.ExecuteAsync("InsertStoredProcedure", params, commanyType: CommandType.StoredProcedure);
}
}
public DataTable ToDataTable<T>(this HashSet<T> iHashSet)
{
DataTable dataTable = new DataTable();
PropertyDescriptorCollection props = TypeDescriptor.GetProperties(typeof(T));
for (int i = 0; i < props.Count; i++)
{
PropertyDescriptor propertyDescriptor = props[i];
Type type = propertyDescriptor.PropertyType;
if (type.IsGenericType && type.GetGenericTypeDefinition() == typeof(Nullable<>))
type = Nullable.GetUnderlyingType(type);
dataTable.Columns.Add(propertyDescriptor.Name, type);
}
object[] values = new object[props.Count];
foreach (T iListItem in iHashSet)
{
for (int i = 0; i < values.Length; i++)
{
values[i] = props[i].GetValue(iListItem);
}
dataTable.Rows.Add(values);
}
return dataTable;
}
insert语句读取TVP并插入
CREATE PROCEDURE [InsertStoredProcedure]
(@TVP_UserInfo dbo.TVP_UserInfo READONLY)
AS
BEGIN
DECLARE @currentDate datetime = CURRENT_TIMESTAMP
INSERT INTO MyTable (UserName, Address,
AccountType, AccountNumber, AccountDisplay,
CreatedDate)
SELECT
UserName, Address,
AccountType, AccountNumber,
CASE
WHEN AccountNumber IS NULL
THEN ''
ELSE 'Anonymous'
END,
@currentDate
FROM
@TVP_UserInfo
END
这是UDT创建
CREATE TYPE [dbo].[TVP_UserInfo]
AS TABLE
(
UserName,
Address,
AccountType,
AccountNumber
)
我偶尔会收到重复的消息,我不知道它们是如何或从哪里来的,因为每条消息都应该是唯一的,因为我使用的是哈希集。
我认为是多线程导致了这种情况,然而,如果我只运行一个任务,有时我仍然会得到重复的任务。如果您注意到创建的日期一直到毫秒都是完全相同的。Id
(主键(不同,但剩余的行数据实际上是重复的。
结果看起来像这个
1 | Joe | JoesAddress1<12346>匿名2022-08-01 01:45:52:352 |
1 | Joe | JoesAddress1<12346>匿名者2022-08-01 01:45:52:352 |
是否允许UserName在数据库中有重复项?如果它不能包含重复项,我建议在该列上添加一个唯一的索引(至少在开发中是这样(。这可能有助于您捕获导致重复的代码。
我可以看到以下几点:首先,你需要等待所有的任务,而不仅仅是最后一个。
var tasks = new List<Task>
foreach(job in jobList)
{
tasks.add(Task.Run(() => ProcessJobs(job)));
}
Task.WaitAll(tasks.ToArray());
其次,我看不出ProcessJobs代码块将如何工作。
- 消息变量超出范围
- InsertIntoDB没有等待
- 字符串没有返回值
但是,我认为您遇到的问题是,代码将有多个线程访问getMessageFromQueue。那么is及其依赖项是可重入的和线程安全的吗?如果所有的工作都是同步的,你可以使用一个锁定对象来限制它如果你有其他异步工作正在进行,最好使用SemaphoreSlim而不是锁,但锁会给你这个想法。
锁定示例
private lockobj = new lockobj();
private async Task<string> ProcessJobs(Job job)
{
lock (lockobj)
{
var messageData = getMessageFromQueue(message);
}
/// rest of your code .... and return value
}