Golang grpc ServerStream抛出错误:传输:流已完成或已调用WriteHeader



我的grpc服务器流函数出现了一个有趣的错误,我简直要崩溃了。在阅读grpc godoc或其他在线内容时,未能找到任何可能的原因。希望有一个更熟悉围棋和grpc流的人能够为我指明正确的方向。

我的实现尝试遵循grp.io网站上的基本服务器流示例。

有问题的protobuf定义:

service ArrayBasedCache {
...
rpc GetRecord (GetRecordRequest) returns (stream MessageResponse) {}
}
message GetRecordRequest {
string key = 1;
}
message MessageResponse {
string message = 1;
}

grpc服务器处理程序:

func (ctlr *cacheClientController) GetRecord(req *svcgrpc.GetRecordRequest, stream svcgrpc.ArrayBasedCache_GetRecordServer) error {
key := req.GetKey()
if ctlr.inputChannels[key] == nil {
return errors.New("Requested record has expired")
}
msgs, e1 := ctlr.client.ReadArrayRecord(key)
if e1 != nil {
panic(e1)
}
log.Printf("Messages: %v", msgs)
for i := 0; i < len(msgs); i++ {
log.Printf("trying to write message: %v", msgs[i])
if e2 := stream.Send(&svcgrpc.MessageResponse{Message: msgs[i]}); e2 != nil {
log.Printf("Writing message %d of %d to stream failed", i+1, len(msgs))
panic(e2)
}
}
return nil
}

我的grpc客户端实现:

func (s *GrpcService) GetRecord(key string) (svcgrpc.ArrayBasedCache_GetRecordClient, error) {
req := &svcgrpc.GetRecordRequest{Key: key}
ctx, cancelFunc := context.WithTimeout(context.Background(), defaultTimeout)
defer cancelFunc()
resp, err := s.grpcClient.GetRecord(ctx, req)
if err != nil {
return nil, err
}
return resp, nil
}
func (s *GrpcService) StreamToArray(stream svcgrpc.ArrayBasedCache_GetRecordClient) ([]string, error) {
out := []string{}
for {
msg, err := stream.Recv()
if err == io.EOF {
break
}
if err != nil {
return nil, errors.New("Unexpected error reading stream")
}
out = append(out, msg.Message)
}
return out, nil
}

我的测试客户端中的入口点

stream, err := c.GetRecord(testKey)
if err != nil {
log.Printf("Retrieving record stream %v failed", testKey)
}
arr, err := c.StreamToArray(stream)
if err != nil {
log.Printf("Failed to unmarshall message stream to array")
}
log.Printf("Retrieved messages %v", arr)

遇到的错误:

2020/10/26 22:43:36 Messages: [abc def hij]
2020/10/26 22:43:36 trying to write message: abc
2020/10/26 22:43:36 Writing message 1 of 3 to stream failed
panic: rpc error: code = Internal desc = transport: transport: the stream is done or WriteHeader was already called
goroutine 82 [running]:
github.com/TasSM/app/service/controller.(*cacheClientController).GetRecord(0xc0001e4cc0, 0xc000176fc0, 0xd6ef80, 0xc00022e5f0, 0xc0001e4cc0, 0xc00039fa00)
C:/Users/myuser/Documents/repos/app/src/service/controller/controller.go:96 +0x5ad
github.com/TasSM/app/service/svcgrpc._ArrayBasedCache_GetRecord_Handler(0xc11a80, 0xc0001e4cc0, 0xd6ca00, 0xc0001dc300, 0x11dec30, 0xc000392000)
C:/Users/myuser/Documents/repos/app/src/service/svcgrpc/cacheservice_grpc.pb.go:192 +0x13d
google.golang.org/grpc.(*Server).processStreamingRPC(0xc0001a7a40, 0xd70300, 0xc000108c00, 0xc000392000, 0xc0001e4de0, 0x119fb00, 0x0, 0x0, 0x0)
C:/Users/myuser/go/pkg/mod/google.golang.org/grpc@v1.33.0/server.go:1457 +0x15cb
google.golang.org/grpc.(*Server).handleStream(0xc0001a7a40, 0xd70300, 0xc000108c00, 0xc000392000, 0x0)
C:/Users/myuser/go/pkg/mod/google.golang.org/grpc@v1.33.0/server.go:1537 +0x1309
google.golang.org/grpc.(*Server).serveStreams.func1.2(0xc000123140, 0xc0001a7a40, 0xd70300, 0xc000108c00, 0xc000392000)
C:/Users/myuser/go/pkg/mod/google.golang.org/grpc@v1.33.0/server.go:871 +0xe0
created by google.golang.org/grpc.(*Server).serveStreams.func1
C:/Users/myuser/go/pkg/mod/google.golang.org/grpc@v1.33.0/server.go:869 +0x349
exit status 2

我认为问题出在GetRecordgrpc客户端实现中,特别是您正在使用的上下文实现中。

通过使用context.WithTimeout和在同一方法中调用defer cancelFunc(),基本上就是在从GetRecord方法返回之前关闭流。

如果仍要使用context.WithTimeout实现,请不要在GetRecord方法中使用cancelFunc,而是从中返回cancelFunc,或者将ctx传递给GetRecord方法。

@eminlala确定了流终止的根本原因。

由于流是长时间运行的(实时返回结果(,因此一般情况下,您不希望对流式结果设置最后期限。

如果您想在建立流连接的Dial操作上设置超时,请通过DialOptionWithTimeout:执行此操作

grpc.Dial(addr, grpc.WithTimeout(defaultTimeout))

或使用DialContext:

ctx, cancelFunc := context.WithTimeout(context.Background(), defaultTimeout)
defer cancelFunc()
grpc.DialContext(ctx, addr)

最新更新