使 IObservable 订阅并发



>我有以下代码

string dataDirectory = _settingsProvider.DataSettings.BaseDirectory;
_solverManagementService.MergedPointCloudProducer(dataDirectory, cancellationToken)
.Subscribe(PointCloudMergerCompleted);

SolverManagementService _solverManagementService在哪里

Public class SolverManagementService : ISolverManagementService
{
public IObservable<IPointCloud> MergedPointCloudProducer(string dataDirectory,
CancellationToken token)
{
return Observable.Create<IPointCloud>(
observer =>
{
PairCollectionProducer(dataDirectory, token)
.Subscribe(pairCollection =>
{
observer.OnNext(_icpBatchSolverService.RecursivelyMergeAsync(
pairCollection, token));
},
onCompleted: () =>
{
observer.OnCompleted();
});
return () => { };
});
}
... // Other methods. 
}

但是这里_icpBatchSolverService.RecursivelyMergeAsync(pairCollection, token)很昂贵,虽然这返回了一个Task<IPointCloud>但我不会线程化这个和这个调用块。由于RecursivelyMergeAsync返回一个Task<IPointCloud>它可以等待,所以我修改了代码以使用async/await

public IObservable<IPointCloud> MergedPointCloudProducer(string dataDirectory,
CancellationToken token)
{
return Observable.Create<IPointCloud>(
observer =>
{
PairCollectionProducer(dataDirectory, token)
.Subscribe(async (pairCollection) =>
{
observer.OnNext(await _icpBatchSolverService.RecursivelyMergeAsync(
pairCollection, token));
},
onCompleted: () =>
{
observer.OnCompleted();
});
return () => { };
});
}

但现在它会立即返回,控制台应用关闭。我相信这可以在不需要Semephores的情况下完成,但我是 RX 的新手。如何将RecursivelyMergeAsync配置为为每个返回的pairCollection并发运行,而不会阻止并在所有递归合并完成时收到通知?

注意。在单元测试中,我执行以下操作

public class IcpBatchSolverServiceTests
{
private Mock<ISettingsProvider> _mockSettingsProvider; 
private IIcpBatchSolverService _icpBatchSolverService;
[OneTimeSetUp]
public void Setup()
{
_mockSettingsProvider = new Mock<ISettingsProvider>();
_mockSettingsProvider.Setup(m => m.IcpSolverSettings).Returns(new IcpSolverSettings());
_mockSettingsProvider.Object.IcpSolverSettings.MaximumDegreeOfParallelism = 6;
Log.Logger = new LoggerConfiguration()
.WriteTo.Console()
.CreateLogger();
var serviceProvider = new ServiceCollection()
.AddLogging(builder =>
{
builder.SetMinimumLevel(LogLevel.Trace);
builder.AddSerilog(Log.Logger);
})
.BuildServiceProvider();
ILogger<IcpBatchSolverServiceTests> logger = serviceProvider
.GetService<ILoggerFactory>()
.CreateLogger<IcpBatchSolverServiceTests>();
_icpBatchSolverService = new IcpBatchSolverService(_mockSettingsProvider.Object, logger);
}
[Test]
public async Task CanSolveBatchAsync()
{
IPointCloud @static = PointCloudFactory.GetRandomPointCloud(1000);
List<IPointCloud> pointCloudList = PointCloudFactory.GenerateRandomlyRotatedBatch(@static, 12);
IPartitioningService<IPointCloud> ps = new PointCloudPartitioningService();
IPointCloud result = await _icpBatchSolverService.RecursivelyMergeAsync(ps.Partition(pointCloudList), CancellationToken.None);
Assert.AreEqual(@static.Vertices.Length, result.Vertices.Length);
}
}

这个过程完美地同时进行。


编辑概述当提供具有命名约定的不同几何的文件文件夹(不同角度的不同几何的深度图(时我需要做什么处理。NNNN.exr,其中NNNN是某个数值。对于一批文件。

  1. 使用不同几何图形的文件名将这些文件批处理到集合中。

对于每个文件批处理

  1. [*串行*] 调用C++ API 以从图像文件中提取深度图。
  2. [*并行*] 将DepthMaps转换为PointClouds。 这可以一次完成。
  3. [*并行*] 使用 ICP 算法合并点云(昂贵(,但将TaskScheduler的并发性限制为两个线程(根据机器架构/内存等选择(

最后,我使用步骤 3 中的合并点云再次调用C++ API。所以在 RX 中,我当前的完整管道看起来像

public class SolverManagementService : ISolverManagementService
{
private readonly IIcpBatchSolverService _icpBatchSolverService;
private readonly IDepthMapToPointCloudAdapter _pointCloudAdapter;
private readonly ILogger<SolverManagementService> _logger;
public SolverManagementService(
IIcpBatchSolverService icpBatchSolverService,
IDepthMapToPointCloudAdapter pointCloudAdapter,
ILogger<SolverManagementService> logger)
{
_icpBatchSolverService = icpBatchSolverService ?? throw new ArgumentNullException("icpBatchSolverService cannot be null");
_pointCloudAdapter = pointCloudAdapter ?? throw new ArgumentNullException("pointCloudAdapter cannot be null");
_logger = logger; 
}
public IObservable<IPointCloud> MergedPointCloudProducer(string dataDirectory, CancellationToken token)
{
return Observable.Create<IPointCloud>(
observer =>
{
PairCollectionProducer(dataDirectory, token)
.Subscribe(pairCollection =>
{
observer.OnNext(_icpBatchSolverService.RecursivelyMergeAsync(pairCollection, token).Result);
},
onCompleted: () =>
{
observer.OnCompleted();
});
return () => { };
});
}
public IObservable<PairCollection<IPointCloud>> PairCollectionProducer(string dataDirectory, CancellationToken token)
{
return Observable.Create<PairCollection<IPointCloud>>(
observer =>
{
Parallel.ForEach(
Utils.GetFileBatches(dataDirectory), 
(fileBatch) =>
{
var producer = RawDepthMapProducer(fileBatch, token);
ConcurrentBag<IPointCloud> bag = new ConcurrentBag<IPointCloud>();
producer.Subscribe(rawDepthMap =>
{
bag.Add(_pointCloudAdapter.GetPointCloudFromDepthMap(rawDepthMap));
_logger?.LogDebug($"Thread {Thread.CurrentThread.ManagedThreadId}: {bag.Count:N0} PointCloud(s) added to concurrent bag");
}, 
onCompleted: () =>
{
PointCloudPartitioningService ps = new PointCloudPartitioningService();
observer.OnNext(ps.Partition(bag.ToList()));
_logger?.LogDebug($"Thread {Thread.CurrentThread.ManagedThreadId}: PointCloud PairCollection generated " +
$"for file set "{Path.GetFileNameWithoutExtension(bag.FirstOrDefault().Source)}"");
});
});
observer.OnCompleted();
return () => { };
});
}
public IObservable<RawDepthMap> RawDepthMapProducer(List<string> filePaths, CancellationToken token)
{
return Observable.Create<RawDepthMap>(
observer =>
{
int index = 0;
foreach(var filePath in filePaths)
{
token.ThrowIfCancellationRequested();
var extractor = DepthMapExtractorFactory.GetDepthMapExtractor(filePath);
observer.OnNext(extractor.GetDepthMap(filePath, index++));
_logger?.LogDebug($"Thread {Thread.CurrentThread.ManagedThreadId}: DepthMap extracted from "{filePath}"");
}
observer.OnCompleted();
return () => { };
});
}
}

我正在寻找:1.上面的代码有什么问题,_icpBatchSolverService.RecursivelyMergeAsync返回一个Task<IPointCloud并且是并发的,我希望这个拖车同时运行。2. 我的代码还有什么问题?

我将留下一个通用的答案,因为上面的代码太广泛了,无法将其归结为。

有两种语法可用于定义异步行为。第一个是async/await模式,第二个是较旧的模式,是Subscribe()模式(反应式(。

异步与并发是一回事吗?

不,绝对不是。 对于那些可能正在阅读本文但不知道的人来说,异步意味着"它稍后发生",而不是"它同时发生"。通过使用其中任一语法,您可以定义在满足某些谓词后立即发生的行为。一个非常常见的用例是处理来自 Web 服务器的响应。您需要发出请求,然后在响应返回时执行某些操作。

并发性是不同的。例如,您可以使用Task.Run()Parallel.ForEach()调用并发。在这两种情况下,您都在定义一个分叉。在Task.Run的情况下,您可以稍后执行Task.WaitAll。在Parallel.ForEach的情况下,它将为您进行分叉/连接。 当然,反应式有自己的一组分叉/连接操作。

当我等待或订阅时会发生什么?

以下两行代码都具有相同的行为,并且该行为使许多程序员感到困惑:

var result = await myAsync();

myObservable.Subscribe(result => { ... });

在这两种情况下,程序的控制流都以可预测但可能令人困惑的方式移动。在第一种情况下,控制流在等待await时返回到父调用方。在第二行中,控制流移动到下一行代码,在返回结果时调用 lambda 表达式。

我在学习如何使用这些变量的人中看到的一个常见事情是尝试将 lambda 中的变量分配给父作用域中的地址。这是行不通的,因为该范围在执行 lambda 之前很久就不复存在了。 使用async/await不太可能做一些愚蠢的事情,但您还必须记住,控制流将沿着调用堆栈向上移动,直到定义下一个同步操作。本文对此进行了更深入的解释,本文也更容易理解。

相关内容

  • 没有找到相关文章

最新更新