为了账号安全,请及时绑定邮箱和手机立即绑定

Golang grpc ServerStream 抛出错误:传输:流已完成或已调用

Golang grpc ServerStream 抛出错误:传输:流已完成或已调用

Go
撒科打诨 2022-07-11 15:14:15
我的 grpc 服务器流功能出现了一个有趣的错误,我正在抓狂。通过阅读 grpc godoc 或在线其他地方无法找到任何可能的原因。希望更熟悉 Go 和 grpc 流的人能够为我指明正确的方向。我的实现尝试遵循 grpc.io 网站上的基本 Server-Streaming 示例。有问题的protobuf定义:service ArrayBasedCache {  ...  rpc GetRecord (GetRecordRequest) returns (stream MessageResponse) {}}message GetRecordRequest {  string key = 1;}message MessageResponse {  string message = 1;}grpc 服务器处理程序:func (ctlr *cacheClientController) GetRecord(req *svcgrpc.GetRecordRequest, stream svcgrpc.ArrayBasedCache_GetRecordServer) error {    key := req.GetKey()    if ctlr.inputChannels[key] == nil {        return errors.New("Requested record has expired")    }    msgs, e1 := ctlr.client.ReadArrayRecord(key)    if e1 != nil {        panic(e1)    }    log.Printf("Messages: %v", msgs)    for i := 0; i < len(msgs); i++ {        log.Printf("trying to write message: %v", msgs[i])        if e2 := stream.Send(&svcgrpc.MessageResponse{Message: msgs[i]}); e2 != nil {            log.Printf("Writing message %d of %d to stream failed", i+1, len(msgs))            panic(e2)        }    }    return nil}我的 grpc 客户端实现:func (s *GrpcService) GetRecord(key string) (svcgrpc.ArrayBasedCache_GetRecordClient, error) {    req := &svcgrpc.GetRecordRequest{Key: key}    ctx, cancelFunc := context.WithTimeout(context.Background(), defaultTimeout)    defer cancelFunc()    resp, err := s.grpcClient.GetRecord(ctx, req)    if err != nil {        return nil, err    }    return resp, nil}func (s *GrpcService) StreamToArray(stream svcgrpc.ArrayBasedCache_GetRecordClient) ([]string, error) {    out := []string{}    for {        msg, err := stream.Recv()        if err == io.EOF {            break        }        if err != nil {            return nil, errors.New("Unexpected error reading stream")        }        out = append(out, msg.Message)    }    return out, nil}
查看完整描述

2 回答

?
蝴蝶刀刀

TA贡献1801条经验 获得超8个赞

我相信问题出在GetRecordgrpc 客户端实现中,特别是您正在使用的上下文实现。

通过在同一方法中使用context.WithTimeout和调用,您基本上是在从方法defer cancelFunc()返回之前关闭流。GetRecord

如果您仍想使用context.WithTimeout实现,请不要使用方法cancelFunc内部GetRecord,而是cancelFunc从它返回或传递ctxtoGetRecord方法。


查看完整回答
反对 回复 2022-07-11
?
RISEBY

TA贡献1856条经验 获得超5个赞

由于流意味着长时间运行 - 实时返回结果 - 通常您不希望对流结果设置截止日期。


如果您想对Dial建立流连接的操作设置超时,您可以通过DialOption WithTimeout执行此操作:


grpc.Dial(addr, grpc.WithTimeout(defaultTimeout))

或使用DialContext:


ctx, cancelFunc := context.WithTimeout(context.Background(), defaultTimeout)

defer cancelFunc()


grpc.DialContext(ctx, addr)


查看完整回答
反对 回复 2022-07-11
  • 2 回答
  • 0 关注
  • 113 浏览
慕课专栏
更多

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信