数据库转储到具有"rows completed"副作用的文本文件



我正在尝试让一个函数从选择语句将数据库 sql 转储写入文本文件。 返回的卷可能非常大,我有兴趣尽快完成此操作。

对于一个大型结果集,我还需要记录每个 x 间隔写入的总行数以及自上次 x 间隔以来每秒写入的行数。 我有一个(映射)实际上在(打开)期间进行写入,所以我相信记录完成的行的副作用应该在那里发生。 (请参阅代码中的注释)。

我的问题是:

  1. 如何在间隔期间写"每秒行数"和"到目前为止的总行数"?
  2. 在将大型 jdbc 结果集写入文件(或命名管道、批量加载程序等)时,我还需要记住什么吗?
  3. 地图)函数周围的(doall)是否获取所有结果...使其不懒惰且可能占用大量内存?
  4. 固定宽度是否可以作为一种选择? 我相信命名管道到批量加载器会更快。 权衡将在磁盘 I/O 上代替下游解析的 CPU 利用率。 但是,这可能需要对返回的结果集进行内省(使用 .getMetaData?

    (ns metadata.db.table-dump 
      [:use 
       [clojure.pprint]
       [metadata.db.connections] 
       [metadata.db.metadata]
       [clojure.string :only (join)]
       [taoensso.timbre :only (debug info warn error set-config!)]
       ]
      [:require
       [clojure.java.io       :as io ] 
       [clojure.java.jdbc     :as j  ]     
       [clojure.java.jdbc.sql :as sql]     
       ]
      )
    (set-config! [:appenders :spit :enabled?] true)
    (set-config! [:shared-appender-config :spit-filename] "log.log")
    (let [
          field-delim    "t" 
          row-delim      "n" 
          report-seconds 10 
          sql            "select * from comcast_lineup "
          joiner         (fn [v] (str (join field-delim v ) row-delim ) )
          results        (rest (j/query local-postgres  [sql ] :as-arrays? true :row-fn joiner ))  
          ]
      (with-open [wrtr (io/writer "test.txt")]
          (doall 
              (map #(.write wrtr %) 
                  ; Somehow in here i want to log with (info ) rows written so
                  ; far, and "rows per second" every 10 seconds.  
                  results )) 
        ) (info "Completed write") )
    

几个一般提示:

  • 在 JDBC 级别,您可能需要使用 setFetchSize 来避免在将整个结果集加载到 Clojure 之前将其加载到 RAM 中。 请参阅 Statement.setFetchSize(nSize) 方法在 SQL Server JDBC 驱动程序中真正做了什么?
  • 确保 clojure.java.jdbc 实际上返回了一个懒惰的 seq(可能是?)-- 如果不是,请考虑结果集序列
  • doall确实会强制整个事情在RAM中;请尝试doseq
  • 考虑使用atom来保持写入的行数;您可以使用它来编写到目前为止的行等。

素描:

(let [ .. your stuff ..
      start (System/currentTimeMillis)
      row-count (atom 0)]
  (with-open [^java.io.Writer wrtr (io/writer "test.txt")]
    (doseq [row results]
      (.write wrtr row)
      (swap! row-count inc)
      (when (zero? (mod @row-count 10000))
        (println (format "written %d rows" @row-count))
        (println (format "rows/s %.2f" (rate-calc-here)))))))

您可能会从我对惯用语的回答中获得一些用处,以进行进度报告?

具体针对您的情况

1)您可以将索引添加到映射中作为匿名函数的第二个参数,然后在要映射的函数中查看索引以查看您正在编写的行。可用于更新原子。

user> (def stats (atom {}))
#'user/stats
user> (let [start-time (. (java.util.Date.) getTime)] 
         (dorun (map (fn [line index] 
                       (println line) ; write to log file here
                       (reset! stats [{:lines index 
                                       :start start-time 
                                       :end (. (java.util.Date.) getTime)}])) 
                     ["line1" "line2" "line3"] 
                     (rest (range)))))
line1
line2
line3
nil
user> @stats  
[{:lines 3, :start 1383183600216, :end 1383183600217}] 
user>  

然后可以每隔几秒钟打印/记录一次stats的内容以更新 UI

3)您肯定希望使用dorun而不是doall因为您怀疑这会耗尽足够大的数据集上的内存。 dorun在写入结果时会丢弃结果,因此如果您想等待足够长的时间,可以在无限大的数据上运行它。

最新更新