我正在学习使用Blazor WebAssembly和SignalR。我希望有一个WebSocket连接(因此是一个HubConnection实例(,并在所有需要通过SignalR WebSocket做一些事情的Blazor组件之间共享它。
我想打电话给HubConnection。StreamAsync((,然后在数据进入时在生成的流上循环(这是一个无尽的流(。我还希望它在组件不再显示时取消流。
我对这个想法有两个问题:
- 只有一个组件能够流式传输值。其他部件卡在";等待foreach";并且从不接收任何项目
- 在组件的处置过程中,我触发了取消令牌,但服务器端没有收到取消。只有当我刷新整个浏览器时,才会触发取消。
- 当我遍历到任何其他Blazor页面,然后返回到流组件所在的页面时,现在没有任何组件接收项目
我有一种模糊的预感,那就是这两件事是以某种方式连接起来的——在服务器端,一个集线器只能在单个连接中真正响应单个StreamAsync((调用?
最后,我一直找不到解决这个问题的办法。我做错了什么?也许你们中有人能帮我解决这个问题?
代码
要复制,您可以按照说明操作:
从Blazor WebAssembly与ASP。NET MVC后端项目。我使用的是.net core 6,大多数nuget都是6.0.7版本。我还安装了nuget Microsoft。AspNetCore。信号R。Blazor客户项目的客户。
进行以下更改/更新:
客户端-应用程序剃刀添加此代码
@using Microsoft.AspNetCore.SignalR.Client
@implements IAsyncDisposable
@inject HubConnection HubConnection
...
@code {
private CancellationTokenSource cts = new();
protected override void OnInitialized()
{
base.OnInitialized();
HubConnection.Closed += error =>
{
return ConnectWithRetryAsync(cts.Token);
};
_ = ConnectWithRetryAsync(cts.Token);
}
private async Task<bool> ConnectWithRetryAsync(CancellationToken token)
{
// Keep trying to until we can start or the token is canceled.
while (true)
{
try
{
await HubConnection.StartAsync(token);
return true;
}
catch when (token.IsCancellationRequested)
{
return false;
}
catch
{
// Try again in a few seconds. This could be an incremental interval
await Task.Delay(5000);
}
}
}
public async ValueTask DisposeAsync()
{
cts.Cancel();
cts.Dispose();
await HubConnection.DisposeAsync();
}
}
客户端-程序.cs将以下单例添加到DI
builder.Services.AddSingleton(sp =>
{
var navMan = sp.GetRequiredService<NavigationManager>();
return new HubConnectionBuilder()
.WithUrl(navMan.ToAbsoluteUri("/string"))
.WithAutomaticReconnect()
.Build();
});
客户端-创建一个名为";StringDisplay">
@using Microsoft.AspNetCore.SignalR.Client
@inject HubConnection HubConnection
@implements IDisposable
@if(currentString == string.Empty)
{
<i>Loading...</i>
}
else
{
@currentString
}
@code {
private string currentString = string.Empty;
private CancellationTokenSource cts = new();
protected override void OnInitialized()
{
base.OnInitialized();
_ = Consumer();
}
protected override void OnParametersSet()
{
base.OnParametersSet();
_ = Consumer();
}
private async Task Consumer()
{
try
{
cts.Cancel();
cts.Dispose();
cts = new();
var stream = HubConnection.StreamAsync<string>("GetStrings", cts.Token);
await foreach(var str in stream)
{
if(cts.IsCancellationRequested)
break;
currentString = str;
StateHasChanged();
}
}
catch(Exception e)
{
Console.WriteLine(e);
}
}
public void Dispose()
{
cts.Cancel();
cts.Dispose();
}
}
客户端-索引.rarzor在页面上添加StringDisplay组件3次:
<hr />
<StringDisplay /><hr />
<StringDisplay /><hr />
<StringDisplay /><hr />
服务器-创建StringGeneratorService.cs
namespace BlazorWebAssembly.Server.Services;
public class StringGeneratorService
{
private readonly PeriodicTimer _timer;
public event Action<string>? OnGenerated;
public StringGeneratorService()
{
_timer = new PeriodicTimer(TimeSpan.FromMilliseconds(200));
Task.Run(TimerRunnerAsync);
}
private async Task TimerRunnerAsync()
{
while (true)
{
await _timer.WaitForNextTickAsync();
var str = Guid.NewGuid().ToString();
OnGenerated?.Invoke(str);
}
}
}
服务器-创建StringHub.cs
using BlazorWebAssembly.Server.Services;
using Microsoft.AspNetCore.SignalR;
using System.Runtime.CompilerServices;
namespace BlazorWebAssembly.Server.Hubs
{
public class StringHub : Hub
{
private readonly StringGeneratorService _generatorService;
public StringHub(StringGeneratorService generatorService)
{
_generatorService = generatorService;
}
public async IAsyncEnumerable<string> GetStrings([EnumeratorCancellation] CancellationToken cancellationToken)
{
using var flag = new AutoResetEvent(false);
string currentString = string.Empty;
var listener = (string str) => { currentString = str; flag.Set(); };
_generatorService.OnGenerated += listener;
cancellationToken.Register(() =>
{
_generatorService.OnGenerated -= listener;
});
while (!cancellationToken.IsCancellationRequested)
{
flag.WaitOne();
yield return currentString;
}
yield break;
}
}
}
服务器-程序.cs注册必要的部件
builder.Services.AddSingleton<StringGeneratorService>();
...
app.MapHub<StringHub>("/string");
我自己也发现了这个问题。
核心问题是SignalR集线器是单线程的单线程。
如果你看StringHub.cs
,在while
循环中有一行:flag.WaitOne();
。这会阻塞Hub实现的整个线程。
我为自己做了一个AutoResetEvent
的扩展,它允许我异步地等待信号。扩展如下:
public static async Task<bool> WaitOneAsync(this WaitHandle waitHandle, CancellationToken cancellationToken)
{
try
{
return await Task.Run(waitHandle.WaitOne, cancellationToken);
}
catch (TaskCanceledException)
{
return false;
}
}
有了这个分机,我可以用下面的线路等待信号:await flag.WaitOneAsync(cancellationToken);
所以请记住,集线器本身就是单线程的!不要挡住他们的去路!