在观察者的 OnNext() 方法中等待任务是否安全?



我创建了一个自定义观察者,它基本上在其OnNext((方法中执行了一个异步任务。

我想知道这样做是否是一个好主意,考虑到异步空洞不是很好。

public class MessageObserver : IObserver<Message>
{
private IDisposable _unsubscriber;
private readonly IQueueClient _client;
private readonly TelemetryClient _telemetryClient;
public MessageObserver(IQueueClient client, TelemetryClient telemetryClient)
{
_client = client;
_telemetryClient = telemetryClient;
}
public virtual void Subscribe(IObservable<Message> provider)
{
_unsubscriber = provider.Subscribe(this);
}
public virtual void Unsubscribe()
{
_unsubscriber.Dispose();
}
public virtual void OnCompleted()
{
}
public virtual void OnError(Exception error)
{
}
public virtual async void OnNext(Message message)
{
try
{
await _client.SendAsync(message);
}
catch (Exception ex)
{
_telemetryClient.TrackException(ex);
}
}
}

编辑/添加代码

我有一个 API,我将来自 Angular 客户端的资源发布到该 API,一旦资源被记录到数据库中,我立即向 Azure 服务总线发送一条消息,然后返回以前记录的实体。

我不想等待 Azure 服务总线消息发送后再返回到客户端,因此我想通知 Rx 观察者我有一条新消息需要在另一个线程上异步处理。

这是我的结构:

// POST: /api/management/campaign
[HttpPost]
public async Task<IActionResult> Create([FromBody] CampaignViewModel model)
{
try
{
if (ModelState.IsValid)
{
var createdCampaign = await _campaignService.CreateCampaign(Mapping.ToCampaign(model));
_upsertServiceBus.SendMessage(new Message(Encoding.UTF8.GetBytes(createdCampaign.CampaignId.ToString())));
return Ok(Mapping.ToCampaignViewModel(createdCampaign));
}
return BadRequest(ModelState);
}
catch (Exception ex)
{
_telemetryClient.TrackException(ex);
return BadRequest(new OpenIdConnectResponse
{
Error = OpenIdConnectConstants.Errors.InvalidRequest,
ErrorDescription = Constants.GenericError
});
}
}

-

public class BusService : IBusService
{
private readonly IObservable<Message> _messageObservable;
private readonly ICollection<Message> _messages = new Collection<Message>();
private readonly IQueueClient _queueClient;
private readonly MessageObserver _messageObserver;
private readonly TelemetryClient _telemetryClient;
protected BusService(IConfiguration configuration, string queueName, TelemetryClient telemetryClient)
{
_telemetryClient = telemetryClient;
_queueClient = new QueueClient(configuration["ServiceBusConnectionString"], queueName);
_messageObservable = _messages.ToObservable();
_messageObserver = new MessageObserver(_queueClient, _telemetryClient);
_messageObserver.Subscribe(_messageObservable);
}
public void SendMessage(Message message)
{
_messageObserver.OnNext(message);
}
}

在@Shlomo的答案的帮助下编辑/解决方案:

public class BusService : IBusService
{
private readonly IQueueClient _queueClient;
private readonly TelemetryClient _telemetryClient;
private readonly Subject<Message> _subject = new Subject<Message>();
protected BusService(IConfiguration configuration, string queueName, TelemetryClient telemetryClient)
{
_telemetryClient = telemetryClient;
_queueClient = new QueueClient(configuration["ServiceBusConnectionString"], queueName);
_subject
.ObserveOn(TaskPoolScheduler.Default)
.SelectMany(message =>
{
return Observable.FromAsync(() =>
{
var waitAndRetryPolicy = Policy
.Handle<Exception>()
.WaitAndRetryAsync(3, retryAttempt =>
TimeSpan.FromSeconds(Math.Pow(2, retryAttempt)),
(exception, retryCount, context) =>
{
_telemetryClient.TrackEvent(
$"Sending message to Azure Service Bus failed with exception ${exception.Message}. Retrying...");
}
);
return waitAndRetryPolicy.ExecuteAsync(async ct =>
{
_telemetryClient.TrackEvent("Sending message to Azure Service Bus...");
await _queueClient.SendAsync(message);
}, CancellationToken.None);
});
})
.Subscribe(unit => { _telemetryClient.TrackEvent("Message sent to Azure Service Bus."); },
ex => _telemetryClient.TrackException(ex));
}
public void SendMessage(Message message)
{
_subject.OnNext(message);
}
}

我无法复制或测试,但希望这能让您入门。

此解决方案将_messages_messageObserver_messageObservable替换为主题和反应查询。几点说明:

  • ObserveOn允许您通过更改"调度程序"来转移线程。我选择了将在不同任务上执行其余查询的TaskPoolScheduler
  • 我建议调用_queueClient.SendAsync的同步版本,因为此示例允许 Rx 处理线程。
  • 此解决方案使用 Rx 异常处理,它将在发生异常时终止可观察/处理。如果您希望它自动重新启动,请添加一个.Catch/.Retry

法典:

public class BusService : IBusService
{
private readonly IQueueClient _queueClient;
private readonly TelemetryClient _telemetryClient;
private readonly Subject<Message> _subject = new Subject<Message>();
protected BusService(IConfiguration configuration, string queueName, TelemetryClient telemetryClient)
{
_telemetryClient = telemetryClient;
_queueClient = new QueueClient(configuration["ServiceBusConnectionString"], queueName);
_subject
.ObserveOn(TaskPoolScheduler.Default)  // handle on an available task
.Select(msg => _queueClient.Send(msg)) // syncronous, not async, because already on a different task
.Subscribe(result => { /* log normal result */}, ex => _telemetryClient.TrackException(e), () => {});
}
public void SendMessage(Message message)
{
_subject.OnNext(message);
}
}

正如我所提到的,代码使用Subject,你会发现这里有很多问答不推荐它们。如果需要,您可以将主题替换为事件和该事件上的可观察内容。主题更容易展示,如果保密,我会争辩说很好。

相关内容

  • 没有找到相关文章

最新更新