合并多个IAsyncEnumerable流



随着Mediator 10的发布,现在有了一种范式,允许开发人员创建由IAsyncEnumerable支持的流。我正在利用这个范例创建多个不同的文件系统观察程序来监视多个文件夹。为了监视文件夹,我利用了两种不同的方法:轮询和FileSystemWatcher。作为我的管道的一部分,所有不同的文件夹监视器都聚合到一个IEnumerable<IAsyncEnumerable<FileRecord>中。在每种类型的观察程序中,都有一个内部循环运行,直到通过CancellationToken请求取消。

这是民调观察者:

public class PolledFileStreamHandler : 
IStreamRequestHandler<PolledFileStream, FileRecord>
{
private readonly ISeenFileStore _seenFileStore;
private readonly IPublisher _publisher;
private readonly ILogger<PolledFileStreamHandler> _logger;
public PolledFileStreamHandler(
ISeenFileStore seenFileStore, 
IPublisher publisher, 
ILogger<PolledFileStreamHandler> logger)
{
_seenFileStore = seenFileStore;
_publisher = publisher;
_logger = logger;
}
public async IAsyncEnumerable<FileRecord> Handle(
PolledFileStream request, 
[EnumeratorCancellation] CancellationToken cancellationToken)
{
var queue = new ConcurrentQueue<FileRecord>();
while (!cancellationToken.IsCancellationRequested)
{
var files = Directory.EnumerateFiles(request.Folder)
.Where(f => !_seenFileStore.Contains(f));
await Parallel.ForEachAsync(files, CancellationToken.None, async (f,t) =>
{
var info = new FileRecord(f);

_seenFileStore.Add(f);
await _publisher.Publish(new FileSeenNotification { FileInfo = info }, t);
queue.Enqueue(info);
});

// TODO: Try mixing the above parallel task with the serving task... Might be chaos...
while (!queue.IsEmpty)
{
if (queue.TryDequeue(out var result))
yield return result;
}
_logger.LogInformation("PolledFileStreamHandler watching {Directory} at: {Time}", request.Folder, DateTimeOffset.Now);

await Task.Delay(request.Interval, cancellationToken)
.ContinueWith(_ => {}, CancellationToken.None);
}
}
}

和FileSystemWatcher

public class FileSystemStreamHandler : 
IStreamRequestHandler<FileSystemStream, FileRecord>
{
private readonly ISeenFileStore _seenFileStore;
private readonly ILogger<FileSystemStreamHandler> _logger;
private readonly IPublisher _publisher;
private readonly ConcurrentQueue<FileRecord> _queue;
private Action<object, FileSystemEventArgs>? _tearDown;
public FileSystemStreamHandler(
ISeenFileStore seenFileStore, 
ILogger<FileSystemStreamHandler> logger, 
IPublisher publisher)
{
_seenFileStore = seenFileStore;
_logger = logger;
_publisher = publisher;
_queue = new ConcurrentQueue<FileRecord>();
}
public async IAsyncEnumerable<FileRecord> Handle(
FileSystemStream request, 
[EnumeratorCancellation] CancellationToken cancellationToken)
{
var watcher = SetupWatcher(request.Folder, cancellationToken);

while (!cancellationToken.IsCancellationRequested)
{
if (_queue.TryDequeue(out var record))
yield return record;
await Task.Delay(100, cancellationToken)
.ContinueWith(_ => {}, CancellationToken.None);
}

TearDownWatcher(watcher);
}

private FileSystemWatcher SetupWatcher(string folder, CancellationToken cancellation)
{
var watcher = new FileSystemWatcher(folder);
watcher.NotifyFilter = NotifyFilters.Attributes
| NotifyFilters.CreationTime
| NotifyFilters.DirectoryName
| NotifyFilters.FileName
| NotifyFilters.LastAccess
| NotifyFilters.LastWrite
| NotifyFilters.Security
| NotifyFilters.Size;
watcher.EnableRaisingEvents = true;
_tearDown = (_, args) => OnWatcherOnChanged(args, cancellation);
watcher.Created += _tearDown.Invoke;
return watcher;
}

private async void OnWatcherOnChanged(FileSystemEventArgs args, CancellationToken cancellationToken)
{
var path = args.FullPath;
if (_seenFileStore.Contains(path)) return;

_seenFileStore.Add(path);
try
{
if ((File.GetAttributes(path) & FileAttributes.Directory) != 0) return;
}
catch (FileNotFoundException)
{
_logger.LogWarning("File {File} was not found. During a routine check. Will not be broadcast", path);
return;
}

var record = new FileRecord(path);
_queue.Enqueue(record);
await _publisher.Publish(new FileSeenNotification { FileInfo = record }, cancellationToken);
}
private void TearDownWatcher(FileSystemWatcher watcher)
{
if (_tearDown != null)
watcher.Created -= _tearDown.Invoke;
}
}

最后,这里有一个类,它将所有内容连接在一起,并尝试监视流(在StartAsync方法中(。您会注意到来自System.Interactive.AsyncMerge运算符的存在,该运算符当前未按要求运行。

public class StreamedFolderWatcher : IDisposable
{
private readonly ConcurrentBag<Func<IAsyncEnumerable<FileRecord>>> _streams;
private CancellationTokenSource? _cancellationTokenSource;
private readonly IMediator _mediator;
private readonly ILogger<StreamedFolderWatcher> _logger;
public StreamedFolderWatcher(
IMediator mediator,
IEnumerable<IFileStream> fileStreams, 
ILogger<StreamedFolderWatcher> logger)
{
_mediator = mediator;
_logger = logger;
_streams = new ConcurrentBag<Func<IAsyncEnumerable<FileRecord>>>();
_cancellationTokenSource = new CancellationTokenSource();
fileStreams.ToList()
.ForEach(f => AddStream(f, _cancellationTokenSource.Token));
}
private void AddStream<T>(
T request, 
CancellationToken cancellationToken) 
where T : IStreamRequest<FileRecord>
{
_streams.Add(() => _mediator.CreateStream(request, cancellationToken));
}
public async Task StartAsync(CancellationToken cancellationToken)
{
_cancellationTokenSource = CancellationTokenSource
.CreateLinkedTokenSource(cancellationToken);
var streams = _streams.Select(s => s()).ToList();
while (!cancellationToken.IsCancellationRequested)
{
await foreach (var file in streams.Merge().WithCancellation(cancellationToken))
{
_logger.LogInformation("Incoming file {File}", file);
}

await Task.Delay(1000, cancellationToken)
.ContinueWith(_ => {}, CancellationToken.None);
}
}
public Task StopAsync()
{
_cancellationTokenSource?.Cancel();
return Task.CompletedTask;
}
public void Dispose()
{
_cancellationTokenSource?.Dispose();
GC.SuppressFinalize(this);
}
}

我对Merge行为的期望是,如果我有3个IAsyncEnumerables,那么每个项都应该在产生后立即发出。相反,除非我将yield break放在循环中的某个位置,否则提取的第一个IStreamRequestHandler将无限执行,直到取消令牌强制停止。

如何将多个输入IAsyncEnumerables合并为一个长期输出流,该流在每次产生结果时都会发出?

最小可复制样品

static async IAsyncEnumerable<(Guid Id, int Value)> CreateSequence(
[EnumeratorCancellation] CancellationToken cancellationToken)
{
var random = new Random();
var id = Guid.NewGuid();
while (!cancellationToken.IsCancellationRequested)
{
await Task.Delay(TimeSpan.FromMilliseconds(random.Next(100, 1000)));
yield return (id, random.Next(0, 10));
}
}
var token = new CancellationTokenSource();
var sequences = Enumerable.Range(0, 10)
.Select(_ => CreateSequence(token.Token));
var merged = sequences.Merge();
await foreach (var (id, value) in merged)
{
Console.WriteLine($"[{DateTime.Now.ToShortTimeString()}] Value {value} Emitted from {id}");
}

Rx团队似乎把Merge运算符搞砸了,并创建了具有不同行为的重载。此重载支持并发:

public static IAsyncEnumerable<TSource> Merge<TSource>(
params IAsyncEnumerable<TSource>[] sources);

此重载不支持并发:

public static IAsyncEnumerable<TSource> Merge<TSource>(
this IEnumerable<IAsyncEnumerable<TSource>> sources);

来自源代码内部的注释:

// REVIEW:
// This implementation does not exploit concurrency. We should not introduce such
// behavior in order to avoid breaking changes, but we could introduce a parallel
// ConcurrentMerge implementation. It is unfortunate though that the Merge
// overload accepting an array has always been concurrent, so we can't change that
// either (in order to have consistency where Merge is non-concurrent, and
// ConcurrentMerge is).

因此,您要做的是在Merge()之前转换可枚举的.ToArray()

我设法想出了一个工作,但可能效率低下且有潜在缺陷的解决方案。通过将每个IAsyncEnumerable放入其自己的后台任务中,我能够将每个CCD16发送到线程安全队列中,在每个队列可用时,它们都会在那里得到服务。

public static async IAsyncEnumerable<TSource> MergeAsyncEnumerable<TSource>(
this IEnumerable<IAsyncEnumerable<TSource>> sources,
TimeSpan? debounceTime = default,
[EnumeratorCancellation] CancellationToken cancellationToken = default)
{
var queue = new ConcurrentQueue<TSource>();
var tasks = SetupCollections(sources, queue, cancellationToken);

while (!Task.WhenAll(tasks).IsCompleted)
{
while (!queue.IsEmpty)
if (queue.TryDequeue(out var record))
yield return record;

// Small debounce to prevent an infinite loop from just spinning. 
await WaitIfDebounce(debounceTime, cancellationToken);
}
await Task.CompletedTask;
}
private static Task WaitIfDebounce(
TimeSpan? debounceTime,
CancellationToken cancellationToken)
{
return debounceTime.HasValue
? Task.Delay(debounceTime.Value, cancellationToken)
.ContinueWith(_ => { }, CancellationToken.None)
: Task.CompletedTask;
}
private static IList<Task> SetupCollections<TSource>(
IEnumerable<IAsyncEnumerable<TSource>> sources,
ConcurrentQueue<TSource> queue,
CancellationToken cancellationToken)
{
return sources
.Select(s => Task.Run(async () =>
{
await foreach (var file in s.WithCancellation(cancellationToken)) 
queue.Enqueue(file);
}, cancellationToken))
.ToList();
}

合并IAsyncEnumerable流的两分钱:

public async IAsyncEnumerable<JobResult> Merge(List<IAsyncEnumerator<JobResult>> enumerators, [EnumeratorCancellation] CancellationToken cancellationToken)
{
List<(Task<bool> task, IAsyncEnumerator<JobResult> enumerator)> enumeratorsInProgress = new();
foreach (IAsyncEnumerator<JobResult> enumerator in enumerators)
{
enumeratorsInProgress.Add((enumerator.MoveNextAsync().AsTask(), enumerator));
}
while (enumeratorsInProgress.Any())
{
await Task.WhenAny(enumeratorsInProgress.Select(item => item.task));
var length = enumeratorsInProgress.Count - 1;
for (int i = length; i >= 0; i--)
{
// Check for additional TaskStatus as needed
if (enumeratorsInProgress[i].task.Status == TaskStatus.RanToCompletion)
{
var enumeratorWithCompletedTask = enumeratorsInProgress[i];
enumeratorsInProgress.Remove(enumeratorWithCompletedTask);
if(enumeratorWithCompletedTask.task.Result)
{
yield return enumeratorWithCompletedTask.enumerator.Current;
var enumeratorInProgress = (enumeratorWithCompletedTask.enumerator.MoveNextAsync().AsTask(), enumeratorWithCompletedTask.enumerator);
enumeratorsInProgress.Insert(enumeratorsInProgress.Count, enumeratorInProgress);
}

}
}
}
}

相关内容

  • 没有找到相关文章