我正在尝试开发一个并行数据处理和MySQL使用的应用程序。这是我遇到问题的代码
public ConcurrentDictionary<string, Info> GetDatabaseForCurrentDay(System.DateTime day)
{
string[] date = day.ToShortDateString().Split('.');
string sqlQuery = "SELECT * FROM testtable WHERE Date ='" + date[2] + "-" + date[1] + "-" + date[0] + "';";
ConcurrentDictionary<string, Info> info = new ConcurrentDictionary<string, Info>();
Info[] dayInfo = null;
Parallel.ForEach(ReadData(ConnectionString, sqlQuery), data =>
{
int num = 2;
string[] dataPieces = data.Split(new char[] { ',' }, num);
FileHelpers.FileHelperEngine<Info> engine = new FileHelpers.FileHelperEngine<Info>();
dayInfo = engine.ReadString(dataPieces[1], int.MaxValue);
info.TryAdd(dataPieces[0], dayInfo[0]);
});
return info;
}
除此片段外,还值得提及函数ReadData(ConnectionString, sqlQuery)
,因为它为循环Parallel.ForEach
提供了一个论点。
public IEnumerable<string> ReadData(string connectionString, string queryString)
{
using (MySqlConnection conn = new MySqlConnection(connectionString))
{
using (MySqlCommand comm = new MySqlCommand(queryString, conn))
{
conn.Open();
string command2 = "USE testdatabase;";
MySqlCommand commandUse = new MySqlCommand(command2, conn);
commandUse.ExecuteNonQuery();
comm.CommandTimeout = 0;
MySqlDataReader reader = comm.ExecuteReader();
if (reader.HasRows)
{
while (reader.Read())
{
StringBuilder sb = new StringBuilder();
sb.Append(reader.GetString(0) + ",");
sb.Append(reader.GetDateTime(1).ToString("yyyy-MM-dd") + ",");
sb.Append(reader.GetDouble(2).ToString().Replace(',', '.') + ",");
sb.Append(reader.GetDouble(3).ToString().Replace(',', '.') + ",");
sb.Append(reader.GetDouble(4).ToString().Replace(',', '.') + ",");
sb.Append(reader.GetDouble(5).ToString().Replace(',', '.') + ",");
sb.Append(reader.GetUInt64(6) + ",");
sb.Append(reader.GetDouble(7).ToString().Replace(',', '.'));
yield return sb.ToString();
}
}
}
}
}
现在,让我们回到问题上。代码编译和有效,但是它返回的结果不正确。我注意到ConcurrentDictionary
包含具有错误值的键 - 简而言之,info.TryAdd(dataPieces[0], dayInfo[0])
可以从一个线程中插入一个键,并从另一个线程中插入值,因此,数据可能会损坏。我知道这种行为是并行处理的挫折,但是无法省略此方法。我尝试了不同的方法来解决此问题,但没有任何效果,数据仍然错误。是否有任何解决此问题的解决方案可以保持此代码的执行速度并保存数据?
您需要将dayInfo
移至并行的循环中。基本上,这是一个共享变量,它不断被每个任务提供垃圾结果的任务写作。如果您将其放入代表中,那么它将是每次迭代的一个不同的私人变量,而不会被关闭:
// Info[] dayInfo = null; <--Remove this
Parallel.ForEach(ReadData(ConnectionString, sqlQuery), data =>
{
int num = 2;
string[] dataPieces = data.Split(new char[] { ',' }, num);
FileHelpers.FileHelperEngine<Info> engine = new FileHelpers.FileHelperEngine<Info>();
//declare dayInfo locally within this scope instead
var dayInfo = engine.ReadString(dataPieces[1], int.MaxValue);
info.TryAdd(dataPieces[0], dayInfo[0]);
});