我正在尝试使用web-flux构建一个微服务,该服务将根据特定订阅者的事件发送/发布一些数据。
通过下面的实现(另一个堆栈流问题(,我能够创建一个发布者,当我们通过调用"/send"API 触发事件时,所有订阅者将自动接收数据
@SpringBootApplication
@RestController
public class DemoApplication {
final FluxProcessor processor;
final FluxSink sink;
final AtomicLong counter;
public static void main(String[] args) {
SpringApplication.run(DemoApplication.class, args);
}
public DemoApplication() {
this.processor = DirectProcessor.create().serialize();
this.sink = processor.sink();
this.counter = new AtomicLong();
}
@GetMapping("/send/{userId}")
public void test(@PathVariable("userId") String userId) {
sink.next("Hello World #" + counter.getAndIncrement());
}
@RequestMapping(produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<ServerSentEvent> sse() {
return processor.map(e -> ServerSentEvent.builder(e).build());
}
}
问题陈述 - 我的应用程序具有基于用户的访问权限,对于每个用户,都会有一些通知,我只想根据事件推送这些通知。在这里,事件将与用户 ID 一起存储在数据库中,当我们从另一个 API 到达"发送"端点以及"userId"作为路径变量时,它应该只发送与该用户相关的数据,只有当它注册为订阅者并且仍在信道上侦听时。
这不是一个准确或实际的解决方案,但这个东西有效: 首先是上交所终点:
@RestController
public class SSEController {
private String value = "";
public String getValue() {
return value;
}
public void setValue(String value) {
this.value = value;
}
@GetMapping(path = "/sse", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
Flux<String> getWords() {
System.out.println("sse ?");
return Flux.interval(Duration.ofSeconds(1))
.map(sequence -> getValue());
}
}
从任何服务或任何可以自动连线的地方,只需自动连线:
@Service
public class SomeService {
@Autowired
private SSEController sseController;
...
void something(){
....
sseController.setValue("some value");
....
}
这就是我使用的方式。