我一直在努力让我认为最简单的线程形式在我的应用程序中工作,但我就是做不到。
我想做的是:我有一个主表单,上面有一个状态条和一个进度条。我必须阅读3到99个文件,并将它们的哈希添加到一个字符串[]中,我想将该字符串添加到具有各自哈希的所有文件的列表中。之后,我必须将该列表中的项目与数据库(以文本文件形式提供)进行比较。完成所有这些之后,我必须将主表单中的文本框和进度条更新为33%;大多数情况下,我只是不希望主表单在处理过程中冻结。
我正在处理的文件的总和总是高达1.2GB(+/-几MB),这意味着我应该能够将它们读取到字节[]中并从那里进行处理(我必须计算每个文件的CRC32、MD5和SHA1,这样应该比从HDD读取所有文件快3次)。
此外,我还应该注意,一些文件可能是1MB,而另一个文件可能是1GB。我最初想为99个文件创建99个线程,但这似乎不明智,我想最好在较大的文件线程仍在运行时重用小文件的线程。但这听起来很复杂,所以我也不确定这是否明智。
到目前为止,我已经尝试过workerThreads和backgroundWorkers,但它们似乎都不适合我;至少幕后工作人员有一段时间在工作,但我甚至不明白他们为什么其他时候不工作。。。无论哪种方式,主要形式仍然冻结。现在我已经阅读了.NET 4.0中的任务并行库,但我认为在浪费更多时间之前,我最好先问一个知道自己在做什么的人。
我想做的事情看起来像这样(没有线程):
List<string[]> fileSpecifics = new List<string[]>();
int fileMaxNumber = 42; // something between 3 and 99, depending on file set
for (int i = 1; i <= fileMaxNumber; i++)
{
string fileName = "C:\path\to\file" + i.ToString("D2") + ".ext"; // file01.ext - file99.ext
string fileSize = new FileInfo(fileName).Length.ToString();
byte[] file = File.ReadAllBytes(fileName);
// hash calculations (using SHA1CryptoServiceProvider() etc., no problems with that so I'll spare you that, return strings)
file = null; // I didn't yet check if this made any actual difference but I figured it couldn't hurt
fileSpecifics.Add(new string[] { fileName, fileSize, fileCRC, fileMD5, fileSHA1 });
}
// look for files in text database mentioned above, i.e. first check for "file bundles" with the same amount of files I have here; then compare file sizes, then hashes
// again, no problems with that so I'll spare you that; the database text files are pretty small so parsing them doesn't need to be done in an extra thread.
有人能帮我指出正确的方向吗?我正在寻找一种最简单的方法来快速读取和散列这些文件(我相信散列需要一些时间,而其他文件已经可以读取了),并将输出保存到字符串[]中,而不会冻结主形式,更多,更少。
我感谢您的意见。
编辑以澄清:我所说的"后台工作人员有时工作"是指(对于同一组文件),也许我的代码的第一次和第四次执行会产生正确的输出,UI在5秒内解冻,第二次,第三次和第五次执行它冻结了表单(60秒后,我收到一条错误消息,说某个线程在该时间段内没有响应),我必须通过VS停止执行。
感谢你们所有的建议和建议,正如你们正确猜测的那样,我对线程完全陌生,必须阅读你们发布的很棒的链接。然后我会尝试这些方法,并标记出对我帮助最大的答案。再次感谢!
使用.NET Framework 4.X
- 使用Directory.EnumerateFiles方法进行高效/惰性文件枚举
- 使用Parallel.For()将并行工作委派给PLINQ框架,或使用TPL将单个任务委派给每个管道阶段
- 使用管道模式来管道化以下阶段:计算哈希代码、与模式进行比较、更新UI
- 为了避免UI冻结,请使用适当的技术:对于WPF,请使用Dispatcher.BeginInvoke(),对于WinForms,请使用Invoke(
- 考虑到所有这些东西都有UI,如果需要,添加一些取消功能以放弃长时间运行的操作可能会很有用,请查看CreateLinkedTokenSource类,它允许从"外部范围"触发
CancellationToken
我可以尝试添加一个例子,但值得你自己做,这样你就可以学习所有这些东西,而不是简单地复制/粘贴->让它工作->忘记它
附言:必读-MSDN 上的管道论文
TPL特定管道实施
- 管道模式实现:三个阶段:计算哈希、匹配、更新UI
- 三项任务,每个阶段一项
- 两个阻塞队列
//
// 1) CalculateHashesImpl() should store all calculated hashes here
// 2) CompareMatchesImpl() should read input hashes from this queue
// Tuple.Item1 - hash, Typle.Item2 - file path
var calculatedHashes = new BlockingCollection<Tuple<string, string>>();
// 1) CompareMatchesImpl() should store all pattern matching results here
// 2) SyncUiImpl() method should read from this collection and update
// UI with available results
var comparedMatches = new BlockingCollection<string>();
var factory = new TaskFactory(TaskCreationOptions.LongRunning,
TaskContinuationOptions.None);
var calculateHashesWorker = factory.StartNew(() => CalculateHashesImpl(...));
var comparedMatchesWorker = factory.StartNew(() => CompareMatchesImpl(...));
var syncUiWorker= factory.StartNew(() => SyncUiImpl(...));
Task.WaitAll(calculateHashesWorker, comparedMatchesWorker, syncUiWorker);
CalculateHashesImpl():
private void CalculateHashesImpl(string directoryPath)
{
foreach (var file in Directory.EnumerateFiles(directoryPath))
{
var hash = CalculateHashTODO(file);
calculatedHashes.Add(new Tuple<string, string>(hash, file.Path));
}
}
CompareMatchesImpl():
private void CompareMatchesImpl()
{
foreach (var hashEntry in calculatedHashes.GetConsumingEnumerable())
{
// TODO: obviously return type is up to you
string matchResult = GetMathResultTODO(hashEntry.Item1, hashEntry.Item2);
comparedMatches.Add(matchResult);
}
}
SyncUImpl():
private void UpdateUiImpl()
{
foreach (var matchResult in comparedMatches.GetConsumingEnumerable())
{
// TODO: track progress in UI using UI framework specific features
// to do not freeze it
}
}
TODO:考虑将CancellationToken
用作所有GetConsumingEnumerable()
调用的参数,以便在需要时轻松停止管道执行。
首先,您应该使用更高级别的抽象来解决这个问题。您有一堆任务要完成,所以请使用"任务"抽象。您应该使用任务并行库来做这类事情。让TPL处理创建多少工作线程的问题——如果工作在I/O上被门控,答案可能低至一个。
如果你真的想自己处理线程,一些好建议:
-
永远不要阻塞UI线程。这就是冻结你的申请的原因。想出一个协议,通过该协议,工作线程可以与UI线程通信,然后UI线程除了响应UI事件之外什么都不做。请记住,用户界面控件(如任务完成栏)的方法决不能由UI线程之外的任何其他线程调用。
-
不要创建99个线程来读取99个文件。这就像收到99封邮件,雇佣99名助理撰写回复:这是一个非常昂贵的简单问题解决方案。如果您的工作是CPU密集型,那么"雇佣"的线程数量就没有必要超过为它们提供服务的CPU数量。(这就好比在一个只有四张办公桌的办公室里雇佣了99名助理。这些助理大部分时间都在等待一张办公桌,而不是阅读你的邮件。)如果你的工作是磁盘密集型的,那么大多数线程在等待磁盘的大部分时间都将闲置,这是对资源的更大浪费。
首先,我希望您使用内置库来计算哈希。你可以自己写,但使用已经存在一段时间的东西要安全得多。
如果您的进程是CPU密集型的,那么您可能只需要创建与CPU一样多的线程。如果它被I/O绑定,那么您可能可以处理更多的线程。
我不建议将整个文件加载到内存中。您的哈希库应该支持一次更新一个区块。将一个区块读取到内存中,使用它更新每个算法的哈希,读取下一个区块,并重复直到文件结束。分块方法将有助于降低程序的内存需求。
正如其他人所建议的,研究任务并行库,特别是数据并行性。这可能很简单:
Parallel.ForEach(fileSpecifics, item => CalculateHashes(item));
查看TPL数据流。你可以使用一个节流的ActionBlock,它将为你管理困难的部分。
如果我认为您希望在后台执行一些任务,而不是阻止您的UI,那么UI BackgroundWorker将是一个合适的选择。你提到你有一段时间让它工作,所以我的建议是把你在半工作状态下的东西拿出来,并通过追踪故障来改进它。如果我的预感是正确的,那么你的工作人员抛出了一个异常,而你在代码中似乎没有处理这个异常。未经处理的异常从其包含的线程中冒出,会导致糟糕的事情发生。
这段代码使用两个任务对一个文件(流)进行哈希处理——一个用于读取,另一个用于哈希处理,以获得更健壮的方式向前读取更多块。
因为处理器的带宽比磁盘的带宽高得多,除非您使用一些高速闪存驱动器,否则您不会从同时散列更多文件中获得任何好处。
public void TransformStream(Stream a_stream, long a_length = -1)
{
Debug.Assert((a_length == -1 || a_length > 0));
if (a_stream.CanSeek)
{
if (a_length > -1)
{
if (a_stream.Position + a_length > a_stream.Length)
throw new IndexOutOfRangeException();
}
if (a_stream.Position >= a_stream.Length)
return;
}
System.Collections.Concurrent.ConcurrentQueue<byte[]> queue =
new System.Collections.Concurrent.ConcurrentQueue<byte[]>();
System.Threading.AutoResetEvent data_ready = new System.Threading.AutoResetEvent(false);
System.Threading.AutoResetEvent prepare_data = new System.Threading.AutoResetEvent(false);
Task reader = Task.Factory.StartNew(() =>
{
long total = 0;
for (; ; )
{
byte[] data = new byte[BUFFER_SIZE];
int readed = a_stream.Read(data, 0, data.Length);
if ((a_length == -1) && (readed != BUFFER_SIZE))
data = data.SubArray(0, readed);
else if ((a_length != -1) && (total + readed >= a_length))
data = data.SubArray(0, (int)(a_length - total));
total += data.Length;
queue.Enqueue(data);
data_ready.Set();
if (a_length == -1)
{
if (readed != BUFFER_SIZE)
break;
}
else if (a_length == total)
break;
else if (readed != BUFFER_SIZE)
throw new EndOfStreamException();
prepare_data.WaitOne();
}
});
Task hasher = Task.Factory.StartNew((obj) =>
{
IHash h = (IHash)obj;
long total = 0;
for (; ; )
{
data_ready.WaitOne();
byte[] data;
queue.TryDequeue(out data);
prepare_data.Set();
total += data.Length;
if ((a_length == -1) || (total < a_length))
{
h.TransformBytes(data, 0, data.Length);
}
else
{
int readed = data.Length;
readed = readed - (int)(total - a_length);
h.TransformBytes(data, 0, data.Length);
}
if (a_length == -1)
{
if (data.Length != BUFFER_SIZE)
break;
}
else if (a_length == total)
break;
else if (data.Length != BUFFER_SIZE)
throw new EndOfStreamException();
}
}, this);
reader.Wait();
hasher.Wait();
}
此处为其余代码:http://hashlib.codeplex.com/SourceControl/changeset/view/71730#514336