如何将文件部分流式传输到数据库(r2dbc-postgres)



问题说明了这一点。我在春季使用r2dbc无法找到任何关于将字节数组流式传输到postgresql数据库的信息,例如文件上传。

我可以通过读取像这样的文件的所有字节来存储字节:

@PostMapping("/upload")
suspend fun upload(
@RequestPart("file") filePartMono: Mono<FilePart>
): User {
val filePart = filePartMono.awaitFirstOrNull() ?: throw UploadException("Missing file part")
var inputStream = filePart.content().awaitFirst().asInputStream()
val byteStream = ByteArrayOutputStream()
filePart.content()
.flatMap { dataBuffer -> Flux.just(dataBuffer.asByteBuffer().array()) }
.collectList()
.awaitFirst()
.forEach { bytes -> byteStream.write(bytes) }
val bytes = byteStream.toByteArray()
fileRepository.save(File(bytes));
}

但是我想将filePart.content()流式传输到数据库。我还感兴趣的是通过控制器将bytea从postgres流式传输到客户端。

在的自述中https://github.com/pgjdbc/r2dbc-postgresql,一个Postgresbytea(它是Postgres中的BLOB类型(可以自动映射到byte[]ByteBuffer和R2dbcBlob

我试图在SpringBoot3/SpringDataR2dbc项目中将bytea映射到以上3种类型。

架构文件:

CREATE TABLE IF NOT EXISTS posts (
-- id SERIAL PRIMARY KEY,
id UUID DEFAULT uuid_generate_v4(),
title VARCHAR(255),
content VARCHAR(255),
metadata JSON default '{}',
-- In this sample, use Varchar to store enum(name), Spring Data R2dbc can convert Java Enum to pg VARCHAR, and reverse.
status VARCHAR(255) default 'DRAFT',
created_at TIMESTAMP , --NOT NULL DEFAULT LOCALTIMESTAMP,
updated_at TIMESTAMP,
attachment bytea,
cover_image bytea,
cover_image_thumbnail bytea,
version INTEGER,
PRIMARY KEY (id)
);

和映射的Entity类。

@Data
@ToString
@Builder
@NoArgsConstructor
@AllArgsConstructor
@Table(value = "posts")
class Post {
@Id
@Column("id")
private UUID id;
@Column("title")
private String title;
@Column("content")
private String content;
@Column("metadata")
private Json metadata;
@Column("status")
private Status status;
@Column("attachment")
private ByteBuffer attachment;
@Column("cover_image")
private byte[] coverImage;
@Column("cover_image_thumbnail")
private Blob coverImageThumbnail;
@Column("created_at")
@CreatedDate
private LocalDateTime createdAt;
@Column("updated_at")
@LastModifiedDate
private LocalDateTime updatedAt;
@Column("version")
@Version
private Long version;
enum Status {
DRAFT, PENDING_MODERATION, PUBLISHED;
}
}

不幸的是,只有byte[]型字段工作良好。请参阅Spring Data关系问题#1408。

如果您现在想使用其他类型(ByteBuffer和R2dbc Blob(,则需要额外的自定义转换器。

@Configuration
public class DataR2dbcConfig {
// see: https://github.com/spring-projects/spring-data-relational/issues/1408
@Bean
public R2dbcCustomConversions r2dbcCustomConversions(ConnectionFactory connectionFactory) {
return R2dbcCustomConversions.of(
DialectResolver.getDialect(connectionFactory),
List.of(
new ByteArrayToByteBufferConverter(),
new ByteBufferToByteArrayConverter(),
new ByteArrayToBlobConverter(),
new BlobToByteArrayConverter()
)
);
}
}

@ReadingConverter
class ByteArrayToByteBufferConverter implements Converter<byte[], ByteBuffer> {
@Override
public ByteBuffer convert(byte[] source) {
return ByteBuffer.wrap(source);
}
}
@WritingConverter
class ByteBufferToByteArrayConverter implements Converter<ByteBuffer, byte[]> {
@Override
public byte[] convert(ByteBuffer source) {
return source.array();
}
}
@ReadingConverter
class ByteArrayToBlobConverter implements Converter<byte[], Blob> {
@Override
public Blob convert(byte[] source) {
return Blob.from(Mono.just(ByteBuffer.wrap(source)));
}
}
@WritingConverter
class BlobToByteArrayConverter implements Converter<Blob, byte[]> {
@Override
public byte[] convert(Blob source) {
return Mono.from(source.stream()).block().array();
}
}

在PostRepositoryTest中导入配置,所有这些类型现在都在工作。

在示例项目中,有一个PostController,它包括一个使用ByteBuffer上传/下载示例端点。

@PutMapping("{id}/attachment")
public Mono<ResponseEntity<?>> upload(@PathVariable UUID id,
@RequestPart Mono<FilePart> fileParts) {
return Mono
.zip(objects -> {
var post = (Post)objects[0];
var filePart = (DataBuffer)objects[1];
post.setAttachment(filePart.toByteBuffer());
return post;
},
this.posts.findById(id),
fileParts.flatMap(filePart -> DataBufferUtils.join(filePart.content()))
)
.flatMap(this.posts::save)
.map(saved -> ResponseEntity.noContent().build());
}

@GetMapping("{id}/attachment")
public Mono<Void> read(@PathVariable UUID id, ServerWebExchange exchange) {
return this.posts.findById(id)
.log()
.map(post -> Mono.just(new DefaultDataBufferFactory().wrap(post.getAttachment())))
.flatMap(r -> exchange.getResponse().writeWith(r));
}

更新,增加了文件上传和下载的集成测试。

相关内容

  • 没有找到相关文章

最新更新