core.async 循环阻止等待从通道读取



假设我有一个频道out (chan)。我需要获取放入通道的值并添加它们。值的数量是不确定的(因此不能使用带有(<! out)的结尾情况的传统循环),并且来自外部 IO。我正在使用固定timeoutalts!但这似乎不是解决问题的最佳方法。到目前为止,我有以下内容(我从 https://gist.github.com/schaueho/5726a96641693dce3e47 得到的)

(go-loop
[[v ch] (alts! [out (timeout 1000)])
acc 0]
(if-not v
(do (close! out)
(deliver p acc))
(do
(>! task-ch (as/progress-tick))
(recur (alts! [out (timeout 1000)]) (+ acc v)))))

我遇到的问题是,1000 的超时有时是不够的,并导致 go-loop 过早退出(因为 IO 操作可能需要超过 1000 毫秒才能完成并将 val 放入out通道)。我不认为增加超时值是一个好主意,因为它可能会导致我等待超过必要的时间。

保证所有读取从输出通道并正确退出循环的最佳方法是什么?

更新:

为什么使用超时?因为放入通道中的值数量不是固定的;这意味着,我无法创建退出案例。在没有退出的情况下,go-loop将无限期地等待((<! out))值被放入通道中。如果你有一个没有超时的解决方案,那真的很棒。

我怎么知道我已读取最后一个值?我没有。这就是问题所在。这就是我使用超时和alts的原因!!以退出 GO-循环。

你想用结果做什么?现在简单添加。然而,这不是重要的一点。

最终更新:

我想出了一种方法来获取我将要处理的值的数量。所以我修改了我的逻辑来利用它。我仍然会使用超时和替代!以防止任何锁定。

(go-loop
[[v _] (alts! [out (timeout 1000)])
i 0
acc 0]
(if (and v (not= n i))
(do
(>! task-ch (as/progress-tick))
(recur (alts! [out (timeout 1000)]) (inc i) (+ acc v)))
(do (close! out)
(deliver p* (if (= n i) acc nil)))))

我认为你的问题在你的设计中有点高,而不是特定于核心异步的问题:

一方面,通道中的值数量不确定 — 可能有 0,可能有 10,也可能有 1,000,000。

另一方面,您想阅读所有这些,进行一些计算,然后返回。这是不可能做到的——除非有其他信号可以用来说"我想我现在完成了"。

如果该信号是值的计时,那么您使用alts!的方法是正确的,尽管我相信代码可以简化一点。

更新:您是否可以访问"上游"IO?当 IO 操作完成时,您能否将哨兵值(例如类似::closed的值)放入通道?

"最佳"方法是等待发件人关闭来自 out 或 out 的特殊批处理结束消息以标记输入的结束。

无论哪种方式,解决方案都取决于发送方传达有关输入的内容。

最新更新