Netflix DGS Subscriptions



我正在遵循DGS订阅的文档,没有收到任何错误,但也没有返回任何数据。

设置非常简单。在schema.graphqls文件中,我定义了订阅:

type Subscription {
ratings: Rating
}
type Rating {
stars: Int
}

Java代码如下:

@DgsComponent
public class SubscriptionDataFetcher {
@DgsData(parentType = "Subscription", field = "ratings")
public Publisher<Rating> ratings() {
return Flux.interval(Duration.ofSeconds(1)).map(t -> new Rating(4));
}
}

如果我现在把一个websocket连接到我的后端,它连接得很好,但没有像预期的那样返回任何数据(不管我怎么做,也尝试过使用JavaScript,也连接得很不错,但没有得到任何数据(。

例如,使用curl进行连接(但使用JavaScript结果相同,连接但没有数据(:

curl -o - --http1.1 
--include 
--no-buffer 
--header "Connection: Upgrade" 
--header "Upgrade: websocket" 
--header "Host: localhost:8443" 
--header "Origin: https://localhost:8443" 
--header "Sec-WebSocket-Key: SGVsbG8sIHdvcmxkIQ==" 
--header "Sec-WebSocket-Version: 13" 
https://localhost:8443/subscriptions?query=ewoicXVlcnkiOiAic3Vic2NyaXB0aW9uIHsgIHN0b2NrcyB7bmFtZX0gfSIKfQ==

我也尝试过通过Graphiql接口进行连接,但出现了一个错误:

subscription {
ratings {
stars
}
}

错误消息:

{
"message": "response is not defined",
"stack": "ReferenceError: response is not definedn    at https://localhost:8443/graphiql:46:35n    at async Object.onRun (https://unpkg.com/graphiql/graphiql.min.js:1:540500)"
}

从链接中的示例中,我不清楚的另一件事是如何实际管理订阅。例如,如果发生突变,我想发布一个通知。任何关于如何使用Netflix DGS框架实现这一点的建议也将不胜感激。

不幸的是,DGS附带的graphicql接口似乎无法正确处理订阅-如果您将playground-spring-boot-starter添加到项目中,/playground将提供一个更完善的工具,它完全支持订阅。如果你在那里尝试订阅,它应该会起作用(假设你已经根据文档添加了graphql-dgs-subscriptions-websockets-autoconfigure(。

关于您的第二个问题,即如果发生突变,如何发布通知——不幸的是,文档中缺少这一点,但示例repo中有一个示例。

我把这个例子精简了一点。如果你想支持订阅&像这样的突变:

@DgsSubscription
public Publisher<Review> reviewAdded() {
return reviewsService.getReviewsPublisher();
}
@DgsMutation
public Review addReview(@InputArgument SubmittedReview review) {
return reviewsService.saveReview(review);
}

在您的服务中,您将创建一个Flux(返回给订阅者(,并保留对其发射器的引用,以便在发生突变时可以调用下一个。

@Service
public class ReviewsService {
private FluxSink<Review> reviewsStream;
private ConnectableFlux<Review> reviewsPublisher;
@PostConstruct
public void init() {
Flux<Review> publisher = Flux.create(emitter -> {
reviewsStream = emitter;
});
reviewsPublisher = publisher.publish();
reviewsPublisher.connect();
}
public Review saveReview(SubmittedReview reviewInput) {
Review review = Review.newBuilder()
.username(reviewInput.getUsername())
.starScore(reviewInput.getStarScore())
.submittedDate(OffsetDateTime.now()).build();
// Save to the database, etc.
reviewsStream.next(review); // publishes the review to subscribers
return review;
}
public Publisher<Review> getReviewsPublisher() {
return reviewsPublisher;
}
}

最新更新