我有下面的代码,它在Task.Delay(500000)
时运行,但在Task.Delay(5000)
时不会给我任何结果,因为执行预期输出的持续时间非常短。我正在寻找一种重新设计代码的方法,在那里我可以在没有Task.Delay()
的情况下处理这个问题,因为每次执行的响应时间可能会有所不同。我该怎么做?
注:使用Roald在回答中建议的方法修改了代码。任务的早期查询。对于Change Feed,Delay((无法异步处理。另一种方法是使用Pull模型而不是Push模型
using Microsoft.Azure.Cosmos;
using Microsoft.Azure.Documents.ChangeFeedProcessor;
using System;
using System.Collections.Generic;
using System.Net;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;
namespace ConsoleApp1
{
public class ChangeFeedProcessorOptions
{
public int BufferCapacity { get; set; }
public string ProcessorName { get; set; }
public Container LeaseContainer { get; set; }
public string InstanceName { get; set; }
public DateTime StartTime { get; set; }
}
class Program
{
static async Task Main()
{
var client = new CosmosClient("AccountEndpoint = https://test.documents.azure.com:443/;AccountKey=oaEOA==;");
var database = client.GetDatabase("testDatabase");
var container = database.GetContainer("testContainer");
var options = new ChangeFeedProcessorOptions
{
BufferCapacity = 10,
InstanceName = "ChangeFeedInstanceName",
LeaseContainer = database.GetContainer("leases"),
ProcessorName = "ChangeFeedProcessorName",
StartTime = DateTime.Now.AddDays(-7).ToUniversalTime()
};
var count = 0;
await foreach (var doc in container.GetChangeFeed<document>(options))
{
Console.Write(doc, b: true);
count++;
if (count == 6)
{
break;
}
}
}
public static async IAsyncEnumerable<document> GetChangeFeed<document>(this Container self, ChangeFeedProcessorOptions options, [EnumeratorCancellation] CancellationToken cancellationToken = default)
{
var channel = Channel.CreateBounded<document>(new BoundedChannelOptions(options.BufferCapacity)
{
FullMode = BoundedChannelFullMode.Wait,
SingleReader = true,
SingleWriter = true
});
var processor = self
.GetChangeFeedProcessorBuilder<document>(options.ProcessorName, async (items, cancellation) =>
{
foreach (var item in items)
{
await channel.Writer.WriteAsync(item, cancellation);
}
})
.WithLeaseContainer(options.LeaseContainer)
.WithInstanceName(options.InstanceName)
.WithStartTime(options.StartTime)
.Build();
await processor.StartAsync();
try
{
await foreach (var item in channel.Reader.ReadAllAsync(cancellationToken))
{
yield return item;
}
}
finally
{
await processor.StopAsync();
}
}
}
}
我认为最灵活的方法是首先将更改提要转换为IAsyncEnumerable,这样您就可以使用linq或一些直接的命令式代码来处理它。
您可以使用此扩展方法获得IAsyncEnumerable
SDK版本<=3.15.0
public record ChangeFeedProcessorOptions
{
public int BufferCapacity { get; init; }
public string ProcessorName { get; init; }
public Container LeaseContainer { get; init; }
public string InstanceName { get; init; }
public DateTime StartTime { get; init; }
}
public static async IAsyncEnumerable<T> GetChangeFeed<T>(this Container self, ChangeFeedProcessorOptions options, [EnumeratorCancellation] CancellationToken cancellationToken = default)
{
var channel = Channel.CreateBounded<T>(new BoundedChannelOptions(options.BufferCapacity)
{
FullMode = BoundedChannelFullMode.Wait,
SingleReader = true,
SingleWriter = true
});
var processor = self
.GetChangeFeedProcessorBuilder<T>(options.ProcessorName, async (items, cancellation) =>
{
foreach (var item in items)
{
await channel.Writer.WriteAsync(item, cancellation);
}
})
.WithLeaseContainer(options.LeaseContainer)
.WithInstanceName(options.InstanceName)
.WithStartTime(options.StartTime)
.Build();
await processor.StartAsync();
try
{
await foreach (var item in channel.Reader.ReadAllAsync(cancellationToken))
{
yield return item;
}
}
finally
{
await processor.StopAsync();
}
}
SDK版本>3.15.0
public record ChangeFeedProcessorOptions
{
public DateTime StartTime { get; init; }
public TimeSpan PollInterval { get; init; }
}
public static async IAsyncEnumerable<T> GetChangeFeed<T>(this Container self, ChangeFeedProcessorOptions options, [EnumeratorCancellation] CancellationToken cancellationToken = default)
{
var iterator = self.GetChangeFeedIterator<T>(ChangeFeedStartFrom.Time(options.StartTime));
while (iterator.HasMoreResults)
{
FeedResponse<T> items;
try
{
items = await iterator.ReadNextAsync(cancellationToken);
}
catch (CosmosException ex) when (ex.StatusCode == HttpStatusCode.NotModified)
{
// No changes
await Task.Delay(options.PollInterval, cancellationToken);
continue;
}
foreach (var item in items)
{
yield return item;
}
}
}
然后像这样使用:
static async Task Main()
{
var client = new CosmosClient("AccountEndpoint = https://test.documents.azure.com:443/;AccountKey=oaEOA==;");
var database = client.GetDatabase("testDatabase");
var container = database.GetContainer("testContainer");
var options = new ChangeFeedProcessorOptions
{
BufferCapacity = 10,
InstanceName = ChangeFeedInstanceName,
LeaseContainer = database.GetContainer("leases"),
ProcessorName = ChangeFeedProcessorName,
StartTime = DateTime.Now.Subtract(MaxAge).ToUniversalTime()
};
var count = 0;
await foreach (var doc in container.GetChangeFeed<Recording>(options))
{
WriteObject(doc, b: true);
count++;
if (count == 6)
{
break;
}
}
}
如果添加System.Linq.Async
,效果会更好
await foreach (var doc in container.GetChangeFeed<Recording>(options).Take(6))
{
WriteObject(doc, b: true);
}
对于不同的实现,您也可以看看这里,它使用两个信号量而不是一个通道来实现相同的结果。
powershell cmdlet中的异步代码
出现问题的原因是,在cmdlet中,对WriteObject
、WriteVerbose
、WriteWarning
等的调用必须来自主线程
要解决此问题,您需要在ProcessRecord
中运行一个消息泵,并在需要调用其中任何一个方法时使用它将消息发送回主线程,这正是您在WinForm或WPF中使用Dispatcher所必须执行的操作
负责此操作的库是PowerShellAsync
使用该库,您的代码将成为
[Cmdlet(VerbsCommon.Get, "ChangedRecording")]
[OutputType(typeof(Recording))]
public class SyncRecording : AsyncCmdlet
{
// ...
protected override async Task ProcessRecordAsync()
{
var container = ...;
await foreach (var doc in container.GetChangeFeed<Recording>(options).Take(6))
{
WriteObject(doc, b: true);
}
}
}