API速率限制器间歇性悬挂



我写了一个简单的(我以为...(速率限制器,以将事件驱动的系统保持在我们的许可API命中限制下。由于某些原因,它在发送400-500次请求后抓住了有时

我最好的主意是我已经搞砸了等待功能,因此在某些情况下它永远不会返回,但是我无法找到有缺陷的逻辑。另一个想法是,我已经对异步/任务Interop造成了问题,从而引起了问题。它最初始终工作,然后再起作用。ApiRateLimiter的一个实例在几个组件上共享,以尊重HIT限制系统。

type RequestWithReplyChannel = RequestWithKey * AsyncReplyChannel<ResponseWithKey>
type public ApiRateLimiter(httpClient: HttpClient, limitTimePeriod: TimeSpan, limitCount: int) =
let requestLimit = Math.Max(limitCount,1)
let agent = MailboxProcessor<RequestWithReplyChannel>.Start(fun inbox -> 
    let rec waitUntilUnderLimit (recentRequestsTimeSent: seq<DateTimeOffset>) = async{
        let cutoffTime = DateTimeOffset.UtcNow.Subtract limitTimePeriod
        let requestsWithinLimit = 
            recentRequestsTimeSent 
            |> Seq.filter(fun x -> x >= cutoffTime)
            |> Seq.toList
        if requestsWithinLimit.Length >= requestLimit then
            let! _ = Async.Sleep 100 //sleep for 100 milliseconds and check request limit again
            return! waitUntilUnderLimit requestsWithinLimit
        else
            return requestsWithinLimit
    }
    let rec messageLoop (mostRecentRequestsTimeSent: seq<DateTimeOffset>) = async{
        // read a message
        let! keyedRequest,replyChannel = inbox.Receive()
        // wait until we are under our rate limit
        let! remainingRecentRequests = waitUntilUnderLimit mostRecentRequestsTimeSent
        let rightNow = DateTimeOffset.UtcNow
        let! response =
            keyedRequest.Request
            |> httpClient.SendAsync
            |> Async.AwaitTask
        replyChannel.Reply { Key = keyedRequest.Key; Response = response }
        return! messageLoop (seq {
            yield rightNow
            yield! remainingRecentRequests
        })
    }
    // start the loop
    messageLoop (Seq.empty<DateTimeOffset>)
)            
member this.QueueApiRequest keyedRequest =
    async {
        return! agent.PostAndAsyncReply(fun replyChannel -> (keyedRequest,replyChannel))
    } |> Async.StartAsTask

某些请求很大,需要一点时间,但是没有什么可以导致我对此事物看到的请求的总死亡。

感谢您花点时间看!

我注意到您正在构建一个最近通过使用SEQ发送请求的列表:

seq {
    yield rightNow
    yield! remainingRecentRequests
}

由于f#序列很懒惰,因此会产生一个枚举者,当被要求下一个值时,它将首先产生一个值,然后开始通过其子seq迭代并产生一个值。每次您产生新的请求时,都会添加一个新的枚举者 - 但是何时处置了旧枚举?您可能会认为,一旦waitUntilUnderLimit中的Seq.filter返回false,它们将被处置。但是请考虑一下:F#编译器将如何知道过滤条件始终是错误的?没有深度代码分析(编译器不做(,就无法进行。因此,"旧"的seq永远无法收集垃圾,因为如果需要,它们仍然被保留在周围。我不是100%确定的,因为我尚未测量您的代码的内存使用情况,但是如果您要测量ApiRateLimiter实例的内存使用,我敢打赌,您会稳步增长而不会降低。

我还注意到您在SEQ的 Front 上添加了新项目。这与F#列表使用的语义完全相同,但是对于列表,没有可以分配的对象,并且一旦列表项目失败了List.filter条件,它将被处置。因此,我重写了您的代码以使用最近的列表,而不是SEQ,我对效率进行了另一个更改:因为您创建列表的方式可以保证它会被整理,最新事件首先是最古老的事件,我用List.takeWhile替换List.filter。这样,第一次约会比截止年龄大的那一刻,它将停止检查较旧的日期。

随着这种更改,您现在应该有旧的日期实际上到期,并且ApiRateLimiter类的内存使用情况应波动,但保持恒定。(每次调用waitUntilUnderLimit时,它都会创建新列表,因此它会产生一些GC压力,但它们都应该在第0代中(。我不知道这是否会解决您的挂问题,但这是我在您的代码中看到的唯一问题。

顺便说一句,我还用do! Async.Sleep 100替换了您的行CC_11,这更简单。这里没有效率的提高,但是无需使用let! _ =等待Async<unit>返回;这正是do!关键字的目的。

type RequestWithReplyChannel = RequestWithKey * AsyncReplyChannel<ResponseWithKey>
type public ApiRateLimiter(httpClient: HttpClient, limitTimePeriod: TimeSpan, limitCount: int) =
    let requestLimit = Math.Max(limitCount,1)
    let agent = MailboxProcessor<RequestWithReplyChannel>.Start(fun inbox -> 
        let rec waitUntilUnderLimit (recentRequestsTimeSent: DateTimeOffset list) = async{
            let cutoffTime = DateTimeOffset.UtcNow.Subtract limitTimePeriod
            let requestsWithinLimit = 
                recentRequestsTimeSent 
                |> List.takeWhile (fun x -> x >= cutoffTime)
            if List.length requestsWithinLimit >= requestLimit then
                do! Async.Sleep 100 //sleep for 100 milliseconds and check request limit again
                return! waitUntilUnderLimit requestsWithinLimit
            else
                return requestsWithinLimit
        }
        let rec messageLoop (mostRecentRequestsTimeSent: DateTimeOffset list) = async{
            // read a message
            let! keyedRequest,replyChannel = inbox.Receive()
            // wait until we are under our rate limit
            let! remainingRecentRequests = waitUntilUnderLimit mostRecentRequestsTimeSent
            let rightNow = DateTimeOffset.UtcNow
            let! response =
                keyedRequest.Request
                |> httpClient.SendAsync
                |> Async.AwaitTask
            replyChannel.Reply { Key = keyedRequest.Key; Response = response }
            return! messageLoop (rightNow :: remainingRecentRequests)
        }
        // start the loop
        messageLoop []
    )            
    member this.QueueApiRequest keyedRequest =
        async {
            return! agent.PostAndAsyncReply(fun replyChannel -> (keyedRequest,replyChannel))
        } |> Async.StartAsTask

相关内容

  • 没有找到相关文章

最新更新