Clojure core.async.如何用打开的go块懒惰地下载



这是我之前的问题如何在 clojure 中按部分生成惰性序列的延续?

我想从数据库中分部分下载数据。最初,我下载前 500 行,然后发送请求以获取接下来的 500 行,依此类推,直到我从服务器收到所有数据。

我写了代码:

(jdbc/atomic conn
 (with-open [cursor (jdbc/fetch-lazy conn [sql_query])]
   (let [lazyseq (jdbc/cursor->lazyseq cursor)
         counter (atom 1)]
     (swap! lazyseq_maps assoc :session_id {:get_next? (chan 1) :over_500 (chan 1) :data []})
     (>!! (:get_next? (:session_id @lazyseq_maps)) true)
     (go
       (doseq [row lazyseq]
         (swap! counter inc)
         (when (<! (:get_next? (:session_id @lazyseq_maps)))
           (swap! lazyseq_maps update-in [:session_id :data] conj row)
           (if (not= 0 (mod @counter 500))
             (>! (:get_next? (:session_id @lazyseq_maps)) true)
             (>! (:over_500 (:session_id @lazyseq_maps)) true))))
        ;
        (close! (:get_next? (:session_id @lazyseq_maps)))
        (close! (:over_500 (:session_id @lazyseq_maps)))
        (.close conn))
     (when (<!! (:over_500 (:session_id @lazyseq_maps))) {:message "over 500 rows"
                                                          :id :session_id
                                                          :data (:data (:session_id @lazyseq_maps))}))))

我在剂量q循环的帮助下获取行。当 doseq 超过 500 行时,我将循环(when (<! (:get_next? (:session_id @lazyseq_maps)))停放并等待来自外部的信号以检索接下来的 500 行。

但是在这里我有一个问题。当我发送信号时,程序抛出错误"结果集已关闭"。即连接在开放范围之外关闭。但我不明白为什么,因为 go 块被放置在开放范围内。你能帮我解决问题吗?

(go ...)立即返回,因此,(with-open ...) 也是如此。

您可能希望以相反的方式执行此操作:

(go (with-open ...))

但是,请注意,此过程将保留数据库连接(稀缺资源!(很长时间,这可能是不可取的,并且由于go块而与拥有"轻量级"线程的好处背道而驰。以下是一些需要考虑的替代方案:

  • 也许您可以为每个批次重新打开数据库连接?
  • 也许您可以急切地将整个结果集流式传输到外部存储(例如 AWS S3(,并让客户端对此进行轮询?

除非您使用的是内存严重受限的系统,否则我建议您一次将所有行加载到 RAM 并关闭数据库连接。否则,您的完整解决方案可能会非常复杂,难以测试和推理。

如果您有数千万行,也许您可以在某些分区中获取它们?

最新更新