我的 grpc 服务器流功能出现了一个有趣的错误,我正在抓狂。通过阅读 grpc godoc 或在线其他地方无法找到任何可能的原因。希望更熟悉 Go 和 grpc 流的人能够为我指明正确的方向。我的实现尝试遵循 grpc.io 网站上的基本 Server-Streaming 示例。有问题的protobuf定义:service ArrayBasedCache { ... rpc GetRecord (GetRecordRequest) returns (stream MessageResponse) {}}message GetRecordRequest { string key = 1;}message MessageResponse { string message = 1;}grpc 服务器处理程序:func (ctlr *cacheClientController) GetRecord(req *svcgrpc.GetRecordRequest, stream svcgrpc.ArrayBasedCache_GetRecordServer) error { key := req.GetKey() if ctlr.inputChannels[key] == nil { return errors.New("Requested record has expired") } msgs, e1 := ctlr.client.ReadArrayRecord(key) if e1 != nil { panic(e1) } log.Printf("Messages: %v", msgs) for i := 0; i < len(msgs); i++ { log.Printf("trying to write message: %v", msgs[i]) if e2 := stream.Send(&svcgrpc.MessageResponse{Message: msgs[i]}); e2 != nil { log.Printf("Writing message %d of %d to stream failed", i+1, len(msgs)) panic(e2) } } return nil}我的 grpc 客户端实现:func (s *GrpcService) GetRecord(key string) (svcgrpc.ArrayBasedCache_GetRecordClient, error) { req := &svcgrpc.GetRecordRequest{Key: key} ctx, cancelFunc := context.WithTimeout(context.Background(), defaultTimeout) defer cancelFunc() resp, err := s.grpcClient.GetRecord(ctx, req) if err != nil { return nil, err } return resp, nil}func (s *GrpcService) StreamToArray(stream svcgrpc.ArrayBasedCache_GetRecordClient) ([]string, error) { out := []string{} for { msg, err := stream.Recv() if err == io.EOF { break } if err != nil { return nil, errors.New("Unexpected error reading stream") } out = append(out, msg.Message) } return out, nil}
2 回答
蝴蝶刀刀
TA贡献1801条经验 获得超8个赞
我相信问题出在GetRecord
grpc 客户端实现中,特别是您正在使用的上下文实现。
通过在同一方法中使用context.WithTimeout
和调用,您基本上是在从方法defer cancelFunc()
返回之前关闭流。GetRecord
如果您仍想使用context.WithTimeout
实现,请不要使用方法cancelFunc
内部GetRecord
,而是cancelFunc
从它返回或传递ctx
toGetRecord
方法。
RISEBY
TA贡献1856条经验 获得超5个赞
由于流意味着长时间运行 - 实时返回结果 - 通常您不希望对流结果设置截止日期。
如果您想对Dial建立流连接的操作设置超时,您可以通过DialOption WithTimeout执行此操作:
grpc.Dial(addr, grpc.WithTimeout(defaultTimeout))
或使用DialContext:
ctx, cancelFunc := context.WithTimeout(context.Background(), defaultTimeout)
defer cancelFunc()
grpc.DialContext(ctx, addr)
- 2 回答
- 0 关注
- 113 浏览
添加回答
举报
0/150
提交
取消