异步/等待争用条件



我相信我下面有一个竞争条件。 我正在手动构建一个带有 JSON 输出的HttpResponseMessage以异步流式传输。 问题似乎出在计数器 (i) 上。 我想在从列表中第一次写入后的任何元素之前添加一个逗号。

在列表的开头,有时第一次写入之后的前几条记录(我看到多达 3 条)将没有前面的逗号。 该数字不一致,有时按预期工作。 我在本地计算机上没有看到它,但在具有更强大硬件的已部署环境中,它存在。

var LastUpdate = JsonConvert.SerializeObject(dt);
var pre = $"{{ "LastUpdate": {LastUpdate}, "List":[";
var post = "]}";
HttpResponseMessage response = Request.CreateResponse();
response.Content = new PushStreamContent(
    async (stream, http, context) =>
    {
        try
        {
            int i = 0;
            var buffer = Encoding.UTF8.GetBytes(pre);
            await stream.WriteAsync(buffer, 0, buffer.Length, cancellationToken).ConfigureAwait(false);
            var query = getQuery(id);                               
            await query
                .ForEachAsync(async entity =>
                {
                    var student = MapRecord(entity);
                    if (student != null)
                    {
                        var json = JsonConvert.SerializeObject(student);
                        buffer = Encoding.UTF8.GetBytes(((i > 0) ? ", " : "") + json);
                        await stream.WriteAsync(buffer, 0, buffer.Length, cancellationToken).ConfigureAwait(false);
                        i++;
                    }
                }, cancellationToken).ConfigureAwait(false);
            buffer = Encoding.UTF8.GetBytes(post);
            await stream.WriteAsync(buffer, 0, buffer.Length, cancellationToken).ConfigureAwait(false);
        }

如果您使用的是QueryableExtentions.ForEachAsync(谢谢@juharr),那么是的,您有竞争条件。

该方法的签名为:

public static Task ForEachAsync<T>(
    this IQueryable<T> source,
    Action<T> action
)

请注意,该方法接受Action<T>。在异步世界中,这相当于async void。这意味着每次在异步委托中await时,ForEachAsync迭代器实际上是在继续下一个元素,而不是等待委托完成。

相反(如果未在非常大的数据集上调用查询),请使用常规 foreach 语句并在其中await

foreach (var entity in query)
{
    var student = MapRecord(entity);
    if (student != null)
    {
        var json = JsonConvert.SerializeObject(student);
        buffer = Encoding.UTF8.GetBytes(((i > 0) ? ", " : "") + json);
        await stream.WriteAsync(buffer, 0, buffer.Length, cancellationToken)
                    .ConfigureAwait(false);
        i++;
    }
}

如果您主要关注 (i) 计数器,则可以使用 Interlocked.Increment 安全地线程递增 i。这将导致少量的性能争用同步,但您可以通过这种方式持续更新 i。

例:

Interlocked.Increment(ref i);

互锁。增量 MSDN

相关内容

  • 没有找到相关文章

最新更新