从另一个 API 端点 - Spring Webflux 动态触发现有 Flux



我正在尝试使用web-flux构建一个微服务,该服务将根据特定订阅者的事件发送/发布一些数据。

通过下面的实现(另一个堆栈流问题(,我能够创建一个发布者,当我们通过调用"/send"API 触发事件时,所有订阅者将自动接收数据

@SpringBootApplication
@RestController
public class DemoApplication {
    final FluxProcessor processor;
    final FluxSink sink;
    final AtomicLong counter;
    public static void main(String[] args) {
        SpringApplication.run(DemoApplication.class, args);
    }
    public DemoApplication() {
        this.processor = DirectProcessor.create().serialize();
        this.sink = processor.sink();
        this.counter = new AtomicLong();
    }
    @GetMapping("/send/{userId}")
    public void test(@PathVariable("userId") String userId) {
        sink.next("Hello World #" + counter.getAndIncrement());
    }
    @RequestMapping(produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public Flux<ServerSentEvent> sse() {
        return processor.map(e -> ServerSentEvent.builder(e).build());
    }
}

问题陈述 - 我的应用程序具有基于用户的访问权限,对于每个用户,都会有一些通知,我只想根据事件推送这些通知。在这里,事件将与用户 ID 一起存储在数据库中,当我们从另一个 API 到达"发送"端点以及"userId"作为路径变量时,它应该只发送与该用户相关的数据,只有当它注册为订阅者并且仍在信道上侦听时。

这不是一个准确或实际的解决方案,但这个东西有效: 首先是上交所终点:

@RestController
public class SSEController {
    private String value = "";
    public String getValue() {
        return value;
    }
    public void setValue(String value) {
        this.value = value;
    }
    @GetMapping(path = "/sse", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    Flux<String> getWords() {
        System.out.println("sse ?");
        return Flux.interval(Duration.ofSeconds(1))
                .map(sequence -> getValue());
    }
}

从任何服务或任何可以自动连线的地方,只需自动连线:

@Service
public class SomeService {
    @Autowired
    private SSEController sseController;
    ...
    void something(){
        ....
        sseController.setValue("some value");
        ....
    }

这就是我使用的方式。

最新更新