我正在从Flux
向S3写入一个大型JSON数组。同时收集和串行化整个对象流具有实质性的内存含义。这让我将其重新解释为多部分上传,代码大致如下:
results
.map(this::serialize)
.map(
bytes ->
uploadBytes(
bytes,
filename,
bucket,
index.getAndIncrement(),
uploadId))
这意味着在任何给定时间,只有results
的单个元素需要在存储器中串行化。这表面上是有效的,但不会产生有效的JSON,因为合并后的文件不是逗号分隔或用括号定界的。
我们可以添加额外的逻辑来检查上传的索引,这样第一个元素就准备了一个[
,其他每个元素都准备了,
。给出结构:
[result1
+,result2
+…+,resultX
+。。。
这可能由以下代码决定:
ByteArrayOutputStream output = new ByteArrayOutputStream();
if (index == 1) {
output.write('[');
} else {
output.write(',');
}
output.write(bytes);
这个策略仍然省略了最后一个括号,因为我们不知道当前元素是否是最后一个元素。S3文件部分的最小大小也是5mb。最糟糕的情况是最终上传填充了5mb空白的]
。
有没有一种惯用的方法来确定Flux
中的任何给定元素是否是最后一个,以及是否直接跟随一个完整的信号?
为了实现这一点,我只写了一些大意为:
results
.map(this::serialize)
.concatWithValues((byte) ']')
然后在5MB的缓冲区中将这些上传到S3(而不是每个元素(。因此,当5MB缓冲区被填充时,元件可以被串行化。