C# : 操作已取消异常 : 操作已取消



下面我正在尝试将数据发送到事件中心,它工作了几分钟,然后抛出了操作取消异常。关于我使用取消令牌的错误的任何提示(如果这是我应该使用的(?或者我该如何解决这个问题?

public async void send<T>(IEnumerable<T> list, string eventhubname)
{                
var token = new CancellationTokenSource();
CancellationToken ct = token.Token;
EventHubProducerClient producer = null;
try
{
producer = new EventHubProducerClient(this._connectionString, eventhubname);
var eventBatch = await producer.CreateBatchAsync(ct); **Line 148 here**
foreach (T item in list)
{
eventBatch.TryAdd(new EventData(Encoding.UTF8.GetBytes(item.ToString())));
}
await producer.SendAsync(eventBatch);
await producer.DisposeAsync();
}
catch (Exception ex)
{
//($"Error While sending message to Event Hub: { ex.Message}", ex);
if (producer != null)
{
await producer.DisposeAsync();
}
if (ct.IsCancellationRequested)
{
token.Dispose();
throw new TaskCanceledException(ex.Message);
}
throw;
}
}

以下是异常梗


System.OperationCanceledException: The operation was canceled.
at Microsoft.Azure.Amqp.AsyncResult.End[TAsyncResult](IAsyncResult result)
at Microsoft.Azure.Amqp.AmqpCbsLink.SendTokenAsyncResult.<>c__DisplayClass13_0.<GetAsyncSteps>b__3(SendTokenAsyncResult thisPtr, IAsyncResult r)
at Microsoft.Azure.Amqp.IteratorAsyncResult`1.StepCallback(IAsyncResult result)
--- End of stack trace from previous location where exception was thrown ---
at Microsoft.Azure.Amqp.AsyncResult.End[TAsyncResult](IAsyncResult result)
at Microsoft.Azure.Amqp.AmqpCbsLink.<>c__DisplayClass4_0.<SendTokenAsync>b__1(IAsyncResult a)
at System.Threading.Tasks.TaskFactory`1.FromAsyncCoreLogic(IAsyncResult iar, Func`2 endFunction, Action`1 endAction, Task`1 promise, Boolean requiresSynchronization)
--- End of stack trace from previous location where exception was thrown ---
at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
at System.Runtime.CompilerServices.TaskAwaiter.ValidateEnd(Task task)
at Azure.Messaging.EventHubs.Amqp.AmqpConnectionScope.<CreateSendingLinkAsync>d__63.MoveNext()
--- End of stack trace from previous location where exception was thrown ---
at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
at Azure.Messaging.EventHubs.Amqp.AmqpConnectionScope.<OpenProducerLinkAsync>d__58.MoveNext()
--- End of stack trace from previous location where exception was thrown ---
at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
at Azure.Messaging.EventHubs.Amqp.AmqpProducer.<CreateLinkAndEnsureProducerStateAsync>d__32.MoveNext()
--- End of stack trace from previous location where exception was thrown ---
at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
at Microsoft.Azure.Amqp.FaultTolerantAmqpObject`1.<OnCreateAsync>d__6.MoveNext()
--- End of stack trace from previous location where exception was thrown ---
at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
at Microsoft.Azure.Amqp.Singleton`1.<GetOrCreateAsync>d__13.MoveNext()
--- End of stack trace from previous location where exception was thrown ---
at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
at Microsoft.Azure.Amqp.Singleton`1.<GetOrCreateAsync>d__13.MoveNext()
--- End of stack trace from previous location where exception was thrown ---
at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
at Azure.Messaging.EventHubs.Amqp.AmqpProducer.<CreateBatchAsync>d__29.MoveNext()
--- End of stack trace from previous location where exception was thrown ---
at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
at Azure.Messaging.EventHubs.Producer.EventHubProducerClient.<CreateBatchAsync>d__42.MoveNext()
--- End of stack trace from previous location where exception was thrown ---
at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
at <send>d__20`1.MoveNext() in  line 148

OperationCanceledException异常通常意味着事件中心服务操作超时。 在堆栈跟踪中,客户端在尝试建立到服务的 AMQP 链接并发送授权令牌时似乎超时。

这通常表示与服务的网络通信存在问题。 如果没有更多有关代码运行环境的上下文,我只能推测原因。

一种常见情况是在无法使用原始 TCP 通信的环境中运行时,例如 Xamarin Android。 另一种常见情况是在防火墙规则过滤传出连接的环境中运行时。 对于 TCP 传输,您需要确保标准 AMQP 端口 5671 和 5672 已打开并可用于传出连接。

若要解决这两种情况,可能需要尝试将EventHubProducerClientOptions上的TransportType设置为EventHubsTransportType.AmqpWebSockets

例如:

var options = new EventHubClientOptions();
options.ConnectionOptions.TransportType = EventHubsTransportType.AmqpWebSockets;
await using var producer = new EventHubProducerClient(
"<< CONNECTION STRING >>", 
"<< EVENT HUB NAME >>", 
options);
// MORE CODE...

关于您的代码段,我想提到的一件重要事情是您可能会丢失数据。 因为您忽略了对TryAdd的返回值,所以如果您传入的枚举量大于单个批处理中可以发送的枚举量,则以静默方式无法添加它们。

我建议您考虑尊重TryAdd的返回,或者使用接受一组事件的SendAsync重载。 在前面的情况下,如果TryAdd返回false,则您知道批处理已满,并且应将集合分解为多个批处理。 在后一种情况下,如果集太大而无法在单个调用中发送,则调用将失败。

对于一些其他想法:

  • 我看不出您需要创建取消令牌的原因,因为您没有使用它来请求取消发送,因此您可能会跳过该步骤。

  • 为了方便起见,生产者客户端允许处置;与HttpClient一样,它作为长期客户端使用是有效的。 如果您要在一段时间内发送数据,我建议您创建一次数据,然后仅在应用程序关闭或发送完成一段时间后关闭/处置。

  • EventDataBatch是一次性的,并且包含对非托管项目的引用。 我建议确保在发送操作完成时释放它。

将其中一些反馈付诸行动,同时将生产者的范围限定为单个方法调用,示例如下所示:

public async void Send<T>(IEnumerable<T> data, string eventHubName)
{      
var options = new EventHubClientOptions();
options.ConnectionOptions.TransportType = EventHubsTransportType.AmqpWebSockets; 

await using var producer = new EventHubProducerClient(
this._connectionString, 
eventHubName, 
options);
try
{
var eventSet =
data.Select(item => new EventData(Encoding.UTF8.GetBytes(item.ToString()));
await producer.SendAsync(eventSet).ConfigureAwait(false);
}
catch (Exception ex)
{
Log($"Error While sending message to Event Hub: { ex.Message}", ex);
throw;
}
}

有关更全面的示例,您可能需要查看:

  • 使用自定义选项创建客户端的事件中心示例

  • 用于发布多个批次的事件中心示例

最新更新