在WPF应用程序中,我有一个发布消息的第三方库。
信息如下:
public class DialectMessage
{
public string PathAndQuery { get; private set; }
public byte[] Body { get; private set; }
public DialectMessage(string pathAndQuery, byte[] body)
{
this.PathAndQuery = pathAndQuery;
this.Body = body;
}
}
我从我的app.cs文件中设置了外部消息源:
public partial class App : Application
{
static App()
{
MyComponent.MessageReceived += MessageReceived;
MyComponent.Start();
}
private static void MessageReceived(Message message)
{
//handle message
}
}
这些消息可以一次从多个线程发布,从而可以一次多次调用事件处理程序。
我有一个服务对象,它必须解析传入的消息。此服务实现以下接口:
internal interface IDialectService
{
void Parse(Message message);
}
我在app.cs文件中有一个默认的静态实例:
private readonly static IDialectService g_DialectService = new DialectService();
为了简化解析器的代码,我希望确保一次只解析一条消息。
我还想避免锁定我的事件处理程序,因为我不想阻止第三方对象。
由于这个要求,我不能从我的消息事件处理程序直接调用g_DialectService.Parse
确保这种单线程执行的正确方法是什么
我的第一个想法是将我的解析操作封装在Produce/Customer模式中。为了达到这个目标,我尝试了以下方法:
在我的app.cs:中声明BlockingCollection
private readonly static BlockingCollection<Message> g_ParseOperations = new BlockingCollection<Message>();
更改事件处理程序的主体以添加操作:
private static void MessageReceived(Message message) { g_ParseOperations.Add(message); }
创建一个新线程,从我的应用程序构造函数中抽取集合:
static App() { MyComponent.MessageReceived += MessageReceived; MyComponent.Start(); Task.Factory.StartNew(() => { Message message; while (g_ParseOperations.TryTake(out message)) { g_DialectService.Parse(message); } }); }
但是,此代码似乎不起作用。永远不会调用服务Parse方法。
此外,我不确定这种模式是否能让我正确地关闭应用程序。
为了确保一切正常工作,我需要对代码进行哪些更改
PS:我的目标是.Net 4.5
[Edit]经过一番搜索和ken2k的答案,我可以看到我错误地用trytake代替了take。
我现在更新的代码是:
private readonly static CancellationTokenSource g_ShutdownToken = new CancellationTokenSource();
private static void MessageReceived(Message message)
{
g_ParseOperations.Add(message, g_ShutdownToken.Token);
}
static App()
{
MyComponent.MessageReceived += MessageReceived;
MyComponent.Start();
Task.Factory.StartNew(() =>
{
while (!g_ShutdownToken.IsCancellationRequested)
{
var message = g_ParseOperations.Take(g_ShutdownToken.Token);
g_DialectService.Parse(message);
}
});
}
protected override void OnExit(ExitEventArgs e)
{
g_ShutdownToken.Cancel();
base.OnExit(e);
}
此代码按预期运行。消息按正确顺序处理。然而,一旦我退出应用程序,我就会在Take方法上得到一个"CancelledException",即使我之前刚刚测试了IsCancellationRequested。
文档介绍了BlockingCollection.TryTake(out T item)
:
如果集合为空,此方法将立即返回false。
所以基本上你的循环会立即退出。您可能想要的是使用超时参数调用TryTake方法,并在mustStop
变量变为true
:时退出循环
bool mustStop = false; // Must be set to true on somewhere else when you exit your program
...
while (!mustStop)
{
Message yourMessage;
// Waits 500ms if there's nothing in the collection. Avoid to consume 100% CPU
// for nothing in the while loop when the collection is empty.
if (yourCollection.TryTake(out yourMessage, 500))
{
// Parses yourMessage here
}
}
对于您编辑后的问题:如果您的意思是收到了一个OperationCanceledException
,那没关系,这正是以CancellationToken
对象为参数的方法的行为:)只需捕获异常并优雅地退出。