如何在发送消息之前和之后以及处理消息之前和之后将中间件添加到 Rebus 消息处理管道?



我需要它来简化以下典型例行操作的实现:

  1. 我想在发送消息之前捕获用户的上下文,并在处理消息之前恢复用户上下文,类似于以下旧示例中的操作方式:https://github.com/rebus-org/RebusSamples/tree/master/old/UserContextHeaders

  2. 我想在处理消息之前对其进行验证和重复数据删除,并在处理消息后记录结果。

正如问题作者正确指出的那样,Rebus.Events 包提供了可读且可访问的方式来挂钩到发送/接收消息之前/之后。

如果这就足够了,我肯定会同意。

但是,例如,如果您想在try/finally中包装单个消息的整个处理(我建议您在恢复发送用户的身份以处理消息时执行此操作),您可能需要查看基于装饰器的本机扩展机制。

您可以阅读有关如何通过修饰其管道来扩展 Rebus 的可扩展性的 wiki 页面。

例如,若要在处理消息之前和之后对当前声明主体执行某些操作,可以实现如下所示的"传入管道步骤":

[StepDocumentation("Write a nice descriptoion here")]
class MyIncomingStep : IIncomingStep
{
public async Task Process(IncomingStepContext context, Func<Task> next)
{
var originalPrincipal = ClaimsPrincipal.Current;
try 
{
// establish user identity here
ClaimsPrincipal.Current = ...
// handle message
await next();
}
finally 
{
ClaimsPrincipal.Current = originalPrincipal;
}
}
}

然后,您可以使用"步骤注入器"装饰 Rebus 的IPipeline,声明性地说明您希望在管道中插入步骤的位置:

.Options(o => {
o.Decorate<IPipeline>(c =>
{
var pipeline = c.Get<IPipeline>();
var stepToInject = new MyIncomingStep();
return new PipelineStepInjector(pipeline)
.OnReceive(stepToInject, PipelineRelativePosition.Before, typeof(DispatchIncomingMessageStep));
});    
})

然后 - 为了让事情变得漂亮 - 您可以将上面的代码包装在扩展方法中以进行OptionsConfigurer,从而获得更漂亮的配置语法:

.Options(o => {
o.RestoreClaimsPrincipalWhenHandlingMessages();
})

或者你认为它应该被称为:)

发送消息时,一切都以类似的方式工作,您只想

//....
return new PipelineStepInjector(pipeline)
.OnSend(stepToInject, PipelineRelativePosition.Before, typeof(SerializeOutgoingMessageStep));

相反。

相关内容

最新更新