链接多个并行异步工作流



我是一名对学习F#感兴趣的C#开发人员。我有一个简单的AWS lambda函数,当用户将新文件上传到S3存储时会触发它。然后解析每个文件,并将其内容发送到API网关。

代码基本上是功能性的,但我很难将所有异步函数链接在一起。到目前为止,我一直在(错误地(使用Async.RunSynchronously来获得概念证明。以下是主要功能的代码:

namespace MyProject
open Amazon.Lambda.Core
open Amazon
open Amazon.S3
open Amazon.S3.Util
open System.IO
open Amazon.S3.Model
open Amazon.SecretsManager.Extensions.Caching

[<assembly: LambdaSerializer(typeof<Amazon.Lambda.Serialization.SystemTextJson.DefaultLambdaJsonSerializer>)>]
()
type Function() =    
member __.FunctionHandler (input: S3EventNotification) (_: ILambdaContext) =        
async {
use client = new AmazonS3Client(RegionEndpoint.EUWest1)
use secretsCache = new SecretsManagerCache()
// ApiClient.authenticate: SecretManagerCache -> Async<string>
// Sends a POST request to the API in order to obtain an authentication token
let! token = ApiClient.authenticate secretsCache
// ApiClient.getExistingIds: SecretManagerCache -> Async<string[]>
// Gets a list of already existing IDs from the API
let! existingIds = ApiClient.getExistingIds secretsCache
// input.Records is a C# List<S3EventNotificationRecord>
for record in input.Records do
// MyParser.processFile: AmazonS3Client -> S3EventNotificationRecord -> Async<MyJsonModel list>
// Downloads the actual contents of the file specified in the S3EventNotification
// and parses it using an FSharp.Data.JsonProvider into individual items
let! json = MyParser.processFile client record
// Split the items into a list that should be updated and a list that should be created
let (putList, postList) = json
|> List.partition (fun item ->
Array.contains item.Id existingIds)

for item in putList do
// ApiClient.putLocation: string -> SecretsManagerCache -> MyJsonModel -> Async<unit>
// Tries to PUT an item and writes the result into logs
ApiClient.putLocation token secretsCache item
|> ignore
for item in postList do
// ApiClient.postLocation: string -> SecretsManagerCache -> MyJsonModel -> Async<unit>
// Tries to POST an item and writes the result into logs
ApiClient.postLocation token secretsCache item
|> ignore
} //??? What to put here? Async.RunSynchronously?

把代码写成文字:

  1. 首先,我需要获得API的身份验证令牌
  2. 然后,我需要从API中获取已经存在的项(或它们的ID(的列表
  3. 接下来,我加载并解析每个上传的文件。这可以并行完成(可能使用Async.Parallel(
  4. 每个文件都会生成一个项目列表,然后将其拆分为一个用于更新的列表和一个用于创建的列表
  5. 来自putListpostList的所有项目然后被发送到API。记录每个请求的结果。两个列表也可以并行处理

我最头疼的是如何";附上";如果所有操作都是使用Async.Parallel完成的,则将解析项的POST和PUTting转换为实际解析。此外,我是否需要在FunctionHandler的末尾添加一个Async.RunSynchronously,或者即使没有这个语句也会执行它?

最后,我已经调用了几个Async.AwaitTask函数来转换AmazonS3ClientSecretsManagerCache对象提供的C#Task<T>对象。Async.AwaitTask是简单地将Task<T>转换为Async<T>,还是以某种方式改变了异步计算的流程?

您的函数应该使用异步签名,因为它按照aws-doc:进行长时间处理

member __.FunctionHandler ... : Threading.Tasks.Task<'T>

你可以用完成异步

async {
return true
}
|> Async.StartAsTask

在函数内部,每次有Seq<Async 'T>并且需要Async<'T[]>时,都可以使用Async.Parallel。在您的情况下,您有Seq<Async<unit>>,因此需要忽略Async.ignore的结果。

async{
do!   putList
|>Seq.map (ApiClient.putLocation token secretsCache)
|>Async.parallel
|>Async.ignore
...
}

输入记录的处理也可以并行化,如果您将所有处理打包到Async<unit>函数中,就像我上面为putlist和putlocation显示的那样。

最新更新