如何使用GrpcJava发送父跟踪id和跨度



我想在服务、流和一元之间执行请求。我使用proto 3和java/springboot。我的代码如下:

@Service
public class GrpcService {
@GrpcClient("square")
private SquareRpcGrpc.SquareRpcBlockingStub blockingStub;
@GrpcClient("square")
private SquareRpcGrpc.SquareRpcStub asyncStub;

public Object getSquareResponseUnary(int number){
return IntStream.rangeClosed(1, number)
.mapToObj(i -> Input.newBuilder().setNumber(i).build())
.map(this.blockingStub::findSquareUnary)
.collect(Collectors.toMap(
Output::getNumber,
Output::getResult
));
}
public Object getSquareResponseStream(int number){
CompletableFuture<Map<Integer, Integer>> completableFuture = new CompletableFuture<>();
OutputStreamingResponse outputStreamingResponse = new OutputStreamingResponse(
new HashMap<>(),
completableFuture
);
StreamObserver<Input> squareBiStream = this.asyncStub.findSquareBiStream(outputStreamingResponse);
IntStream.rangeClosed(1, number)
.mapToObj(i -> Input.newBuilder().setNumber(i).build())
.forEach(squareBiStream::onNext);
squareBiStream.onCompleted();
return completableFuture;
}
}

由于您使用的是spring-boot,因此可以使用spring-cloudsleuth和gRPC拦截器来完成此操作。

  • 客户端
@Slf4j
class CustomForwardingClientCall<ReqT, RespT> extends ClientInterceptors.CheckedForwardingClientCall<ReqT, RespT> {
@Override
protected void checkedStart(Listener<RespT> responseListener, Metadata headers) throws Exception {
// ... get trace info from trace context and put trace info into header
headers.add("traceId", "TRACE_ID")
delegate().start(listener, headers);
}    
}
  • 服务器
public class CustomServerInterceptor implements ServerInterceptor {
@Override
public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> serverCall, Metadata metadata, ServerCallHandler<ReqT, RespT> serverCallHandler) {
// ... get trace info from metadata
String traceId = metadata.get("traceId");
// put into trace context
return new CustomServerCallListener<>(listener);
}
}

最新更新