如何使 IReduceInit 从 next.jdbc 适应使用 cheshire 流式传输 JSON 到使用 ring



tl;博士如何将 IReduceInit 变成转换值的懒惰序列

我有一个数据库查询,它生成了一个相当大的数据集,用于客户端上的实时透视(百万或两行,25 个属性 - 对于现代笔记本电脑来说没有问题)。

我的(简化)堆栈是调用clojure.jdbc来获取(我认为是懒惰的)结果行序列。我可以通过 ring-json 中间件将其作为主体传递出来对其进行序列化。ring-json 在堆上构建响应字符串时存在问题,但从 0.5.0 开始,可以选择将响应流式传输出去。

通过分析几个失败案例,我发现实际上clojure.jdbc在将其交还之前正在内存中实现整个结果集。没关系!我没有在该库中使用reducible-query,而是决定迁移到新的 next.jdbc。

next.jdbc 中的键操作是plan它返回一个 IReduceInit,我可以使用它来运行查询并获取结果集......

(into [] (map :cc_id) (jdbc/plan ds ["select cc_id from organisation where cc_id = '675192'"]))
["675192"]

但是,这实现了整个结果集,在上述情况下,将预先和内存中为我提供所有ID。这不是一个问题,但我通常有很多。

如果我给出一个起始值,我就可以减少 IReduceInit 计划,所以我可以在减少函数中执行输出......(谢谢@amalloy)

(reduce #(println (:cc_id %2)) [] (jdbc/plan ds ["select cc_id from organisation where cc_id = '675192'"]))
675192
nil

。但理想情况下,我想在对它们应用转换函数后将这个 IReduceInit 转换为值的惰性序列,这样我就可以将它们与 ring-json 和 cheshire 一起使用。我没有看到任何明显的方法可以做到这一点。

reduce

与IReduceInit配合使用。IReduceInit 需要一个初始值,您在调用 .reduce 时指定了该值,但在使用 reduce 函数时未指定;这就解释了为什么你看到一个工作而不是另一个。

但是,这不会让您获得懒惰的序列。reduce合约的一部分是它急切地消耗整个输入(我们将忽略reduced它不会改变任何有意义的东西)。你的问题是更普遍的动态作用域问题的一个具体案例:JDBC 生成的序列只在某个上下文中"有效",你需要在这个上下文中进行所有处理,所以它不能偷懒。相反,你通常会把你的程序翻过来:不要使用返回值作为序列,而是给查询引擎传递一个函数,并说,"请用你的结果调用这个函数"。然后,引擎在调用该函数时确保数据有效,并在函数返回后清理数据。我不知道jdbc.next,但是对于较旧的jdbc,您可以使用类似db-query-with-resultset的东西。你会给它传递一些函数,该函数可以向挂起的HTTP响应添加字节,它会多次调用该函数。

这一切都有点模糊,因为我不知道你正在使用什么 HTTP 处理程序,或者它有什么工具可以非懒惰地处理流响应,但如果你想处理动态范围的资源,这是你必须采用的一般想法:懒惰不是一种选择。

令人沮丧。

为什么你不能用JDBC做到这一点呢?没有任何Clojure层?

(let [resultset (.executeQuery connection "select ...")]
(loop 
(when (.next resultset)
(let [row [(.getString resultset 1)
(.getString resultset 2)
...]])
(json/send row)
(recur)))
(json/end))

当然,使用 ResultSetMetaData,您可以自动将行生成为可以处理返回的任何内容的函数。

IReduceInit 允许在 reduce 函数退出时清盘 JDBC 资源。 这比 LazySeq 方法更可预测,后者可能永远不会释放 JDBC 资源。

您可以使用 BlockingQueue 和将来的任务来填充该队列,如下所示

(defn lazywalk-reducible
"walks the reducible in chunks of size n,
returns an iterable that permits access"
[n reducible]
(reify java.lang.Iterable
(iterator [_]
(let [bq (java.util.concurrent.ArrayBlockingQueue. n)
finished? (volatile! false)
traverser (future (reduce (fn [_ v] (.put bq v)) nil reducible)
(vreset! finished? true))]
(reify java.util.Iterator
(hasNext [_] (or (false? @finished?) (false? (.isEmpty bq))))
(next [_] (.take bq)))))))

当然,如果生成了一个迭代器,但没有遵循其结论,这将产生泄漏。

我没有彻底测试过它,它可能还有其他问题;但这种方法应该有效。

你也可以让它变得具体化clojure.lang.ISeq如果Java Iterable对你的用例来说不够好;但随后你开始陷入HeadKeep问题;以及如何处理对Object first()的调用,这将是非常可行的,但我不想过度思考这个问题。

我的懒惰 seq 是一个坏主意的原因有很多 - 即使我保证不抱头,结果流期间的异常问题无疑会让 ResultSet 闲置 - 序列化将发生在可以清理的调用堆栈之外。

懒惰的需求是由不实现记忆中的全部结果的愿望驱动的,需要 seq 或其他 coll? 是这样中间件会序列化它......

因此,直接使 IReduceInit JSONable ,然后绕过中间件。如果在序列化过程中出现异常,控件将从 next.jdbc 通过 IReduceInit,然后可以有意义地清理。

;; reuse this body generator from my patch to ring.middleware.json directly, as the coll? check will fail
(defrecord JsonStreamingResponseBody [body options]
ring-protocols/StreamableResponseBody
(write-body-to-stream [_ _ output-stream]
(json/generate-stream body (io/writer output-stream) options)))

;; the year long yak is shaved in 8 lines by providing a custom serialiser for IReduceInits…
(extend-type IReduceInit
cheshire.generate/JSONable
(to-json [^IReduceInit results ^JsonGenerator jg]
(.writeStartArray jg)
(let [rf (fn [_ ^IPersistentMap m]
(cheshire.generate/encode-map m jg))]
(reduce rf nil results))
(.writeEndArray jg)))
;; at this point I can wrap the result from next.jdbc/plan with ->JsonStreamingResponseBody into the :body of the ring response and it will stream

编写这些功能仍然感觉需要做很多工作,适配器代码总是让我担心我错过了一个简单的惯用方法。

相关内容

  • 没有找到相关文章

最新更新