从Flux到S3的JSON串行化中的内存高效元素处理



我正在从Flux向S3写入一个大型JSON数组。同时收集和串行化整个对象流具有实质性的内存含义。这让我将其重新解释为多部分上传,代码大致如下:

results
.map(this::serialize)
.map(
bytes ->
uploadBytes(
bytes,
filename,
bucket,
index.getAndIncrement(),
uploadId))

这意味着在任何给定时间,只有results的单个元素需要在存储器中串行化。这表面上是有效的,但不会产生有效的JSON,因为合并后的文件不是逗号分隔或用括号定界的。

我们可以添加额外的逻辑来检查上传的索引,这样第一个元素就准备了一个[,其他每个元素都准备了,。给出结构:

[result1+,result2+…+,resultX+。。。

这可能由以下代码决定:

ByteArrayOutputStream output = new ByteArrayOutputStream();
if (index == 1) {
output.write('[');
} else {
output.write(',');
}
output.write(bytes);

这个策略仍然省略了最后一个括号,因为我们不知道当前元素是否是最后一个元素。S3文件部分的最小大小也是5mb。最糟糕的情况是最终上传填充了5mb空白的]

有没有一种惯用的方法来确定Flux中的任何给定元素是否是最后一个,以及是否直接跟随一个完整的信号?

为了实现这一点,我只写了一些大意为:

results
.map(this::serialize)
.concatWithValues((byte) ']')

然后在5MB的缓冲区中将这些上传到S3(而不是每个元素(。因此,当5MB缓冲区被填充时,元件可以被串行化。

相关内容

  • 没有找到相关文章