使用Axon流式传输不同类型的多个事件



我正在使用Axon和ServerSentEvents为客户端/服务器通信构建流式API,不确定是否可以使用Axon查询更新发射器和订阅查询来流式传输和识别多个不同的事件。

我使用AxonQueryUpdateEmitter.emit从基于不同事件的投影中发出事件。发射器是在投影中发射的,而订阅查询是在REST API中进行的,该API应该将服务器发送的事件流式传输到客户端。

例如,我想为一个创建、更新和删除实体的用例发出3个不同的事件
我想知道我们是否可以从不同的事件中发出不同类型的数据,但仍然组合在一个流中,即在发射器中创建和更新实体时发送实际对象,但由于我没有任何实体/数据可以在删除时发出,我想是否发送一条简单的删除消息?

我还想要一种在发出时指定事件类型的方法,这样当ServerSentEvent从订阅查询构建时,我可以指定类型/操作(例如,区分创建或更新事件(以及数据。

主要思想是发出不同的事件并将其添加到一个流中,尽管知道所有事件可能不会作为一个订阅查询的一部分返回完全相同的数据(创建、更新和删除(,并且能够准确识别事件并在ServerSentEvents流中指定适当的事件类型。

关于我该如何做到这一点,有什么想法吗?

以下是我如何使用QueryUpdateEmitter:在创建时发出事件

@EventHandler
public void on(LibraryCreatedEvent event, @Timestamp Instant timestamp) {
final LibrarySummaryEntity librarySummary = mapper.createdEventToLibrarySummaryEntity(event, timestamp);
repository.save(librarySummary);
log.debug("On {}: Saved the first summary of the library named {}", event.getClass().getSimpleName(), event.getName());
queryUpdateEmitter.emit(
AllLibrarySummariesQuery.class,
query -> true,
librarySummary
);
log.debug("emitted library summary: {}", librarySummary.getId());
}

由于我需要区分创建和更新,所以我尝试在更新事件时使用GenericSubscriptionQueryUpdateMessage.asUpdateMessage,并添加了一些元数据,但不确定这是否正确,因为我不确定如何在订阅查询期间检索该信息。

Map<String, String> map = new HashMap();
map.put(“Book Updated”, event.getLibraryId());
queryUpdateEmitter.emit(AllLibrarySummariesQuery.class,query → true,GenericSubscriptionQueryUpdateMessage.asUpdateMessage(librarySummary).withMetaData(map));

以下是我创建订阅查询的方式:

SubscriptionQueryResult<List<LibrarySummaryEntity>, LibrarySummaryEntity> result = queryGateway.subscriptionQuery(new AllLibrarySummariesQuery(),ResponseTypes.multipleInstancesOf(LibrarySummaryEntity.class),ResponseTypes.instanceOf(LibrarySummaryEntity.class));

以及我正在构建服务器发送事件的部分:(.event是我想指定事件类型的地方-创建/更新/删除并相应地发送适用的数据(

Flux<ServerSentEvent<LibrarySummaryResponseDto>> sseStream = result.initialResult()
.flatMapMany(Flux::fromIterable).map(value -> mapper.libraryEntityToResponseDto(value))
.concatWith((streamingTimeout == -1)? result.updates().map(value -> mapper.libraryEntityToResponseDto(value)): result.updates().take(Duration.ofMinutes(streamingTimeout)).map(value -> mapper.libraryEntityToResponseDto(value)))
.log()
.map(created -> ServerSentEvent.<LibrarySummaryResponseDto>builder()
.id(created.getId())
.event("library creation")
.data(created).build())
.doOnComplete(() ->  {log.info("streaming completed");})
.doFinally(signal -> result.close());

只要在进行订阅查询时返回的对象与预期类型匹配,就应该很好!

请注意,这意味着您必须创建一个适合您的场景的响应对象。然而,响应是作为更新(通过QueryUpdateEmitter(发出的,还是作为返回订阅查询的映射操作发出的,则是另一个问题。

理想情况下,你应该将你的内部信息与你向外发送的信息解耦,就像SSE一样。要转到更具体的解决方案,您可以从Flux响应类型中获益。您可以简单地附加任何映射操作,将QueryUpdateEmitter发出的响应调整为所需的SSE格式。

最后,简短的回答是";是的,你可以;只要在CCD_ 4上调度订阅查询时发出的响应对象与期望的更新类型匹配即可。

相关内容

  • 没有找到相关文章

最新更新