Spring引导reactive和EventSource的工作示例



我正在尝试使用反应式mongodb和EventSource来实现一个可工作的spring引导。然而,我面临着重复重新打开连接的问题,因为它已被服务器关闭。我甚至怀疑这是否真的可行,因为我没有找到任何具有反应数据库和事件源的工作示例。。。

你能给我指一个工作示例吗?或者告诉我我的代码出了什么问题?

这里的主要部分代码:

pom.xml

<properties>
<java.version>1.8</java.version>
<junit-jupiter.version>5.3.2</junit-jupiter.version>
</properties>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.3.5.RELEASE</version>
</parent>
<dependencies>
<!-- webflux reactive -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<!-- thymeleaf -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-thymeleaf</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-mongodb-reactive</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-tomcat</artifactId>
</dependency>
<!-- exclude junit 4, prefer junit 5 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
</exclusion>
</exclusions>
</dependency>
<!-- junit 5 -->
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-engine</artifactId>
<version>${junit-jupiter.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-devtools</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
</dependencies>

正如您在pom中看到的,我使用的是嵌入式tomcat(我已经尝试过Netty,默认的春季启动服务器…(。此外,我正在将该应用程序部署到任何远程服务器,但只是在本地(windows 10(上尝试。

Web:

let source = new EventSource("/comment/stream");
source.addEventListener("message", function (event) {
// These events are JSON, so parsing and DOM fiddling are needed
var comment = JSON.parse(event.data);
console.log(comment ); 
});
source.addEventListener("error", function (event) {
console.log("error", event);
this.close();
});

RestController:

@RestController
public class CommentController {
@Autowired
private CommentRepository commentRepository;
@PostMapping(path = "/comment")
public Mono<Comment> comment(@RequestBody Comment comment) {
return this.commentRepository.save(comment);
}
@GetMapping(path = "/comment/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<Comment> feed() {
return this.commentRepository.findAll();
}
}

数据库存储库:

@Repository
public interface CommentRepository extends ReactiveSortingRepository<Comment, String> {
Flux<Comment> findAll();
}

同样,使用EventSource的web客户端每秒都会保持重新连接,因为服务器关闭了连接。

谢谢!

我真的不确定,你给我们的关于你的连接关闭原因的信息太少了。没有日志,也没有透露任何关于它部署位置的信息。

我只会根据个人经验来回答这个问题。我在heroku部署了一个使用事件流的应用程序,他们在每个应用程序前面都有一个代理/负载均衡器,它会杀死任何在60秒后没有发送任何内容的连接。

正如这里提到的,为什么事件源在30-60秒后关闭,这证实了我所注意到的。

为了解决这个问题,如果使用websockets实现ping/pong消息,或者如果像我一样使用ServerSentEvents,我实现了keep-alive消息。

.GET("", accept(TEXT_EVENT_STREAM), request -> ok()
.contentType(TEXT_EVENT_STREAM)
.header("Cache-Control", "no-transform")
.body(Flux.merge(myHandler.getEvents()), 
Flux.interval(Duration.ofSeconds(15))
.map(aLong -> ServerSentEvent.builder()
.comment("keep alive")
.build())),
new ParameterizedTypeReference<List<MyClass>>() {}))

我从我的一个项目中获得了这个代码片段。在这里,你可以看到我将一个通量与我的当前流合并,该通量在给定的时间间隔(15秒(将发出一个只有一个保持活动注释的ServerSentEvent。由于这是一个评论,它将被客户端忽略。

只需提及,常规流myHandler.getEvents返回包裹在ServerSentEvents中的数据。

最新更新