为了账号安全,请及时绑定邮箱和手机立即绑定

如何在 grpc 中正确设计发布-订阅模式?

如何在 grpc 中正确设计发布-订阅模式?

喵喵时光机 2022-07-14 17:05:20
我正在尝试使用 grpc 实现 pub sub 模式,但我对如何正确执行它有点困惑。我的原型: rpc call (google.protobuf.Empty) returns (stream Data);客户:asynStub.call(Empty.getDefaultInstance(), new StreamObserver<Data>() {         @Override         public void onNext(Data value) {           // process a data         @Override         public void onError(Throwable t) {         }         @Override         public void onCompleted() {         }       });   } catch (StatusRuntimeException e) {     LOG.warn("RPC failed: {}", e.getStatus());   }   Thread.currentThread().join();服务器服务:public class Sender extends DataServiceGrpc.DataServiceImplBase implements Runnable {  private final BlockingQueue<Data> queue;  private final static HashSet<StreamObserver<Data>> observers = new LinkedHashSet<>();  public Sender(BlockingQueue<Data> queue) {    this.queue = queue;  }  @Override  public void data(Empty request, StreamObserver<Data> responseObserver) {    observers.add(responseObserver);  }  @Override  public void run() {    while (!Thread.currentThread().isInterrupted()) {      try {        // waiting for first element        Data data = queue.take();        // send head element        observers.forEach(o -> o.onNext(data));      } catch (InterruptedException e) {        LOG.error("error: ", e);        Thread.currentThread().interrupt();      }    }  }}如何正确地从全局观察者中删除客户?连接断开时如何接收某种信号?如何管理客户端-服务器重新连接?连接断开时如何强制客户端重新连接?
查看完整描述

1 回答

?
慕虎7371278

TA贡献1802条经验 获得超4个赞

在实施您的服务时:


  @Override

  public void data(Empty request, StreamObserver<Data> responseObserver) {

    observers.add(responseObserver);

  }

您需要获取当前请求的上下文,并监听取消。对于单请求、多响应调用(又名服务器流),gRPC 生成的代码被简化为直接传递请求。这意味着您无法直接访问底层ServerCall.Listener,这就是您通常如何监听客户端断开和取消的方式。


相反,每个 gRPC 调用都有一个Context与之关联的,它携带取消和其他请求范围的信号。对于您的情况,您只需要通过添加自己的侦听器来侦听取消,然后从链接的哈希集中安全地删除响应观察器。


至于重新连接:如果连接断开,gRPC 客户端会自动重新连接,但通常不会重试 RPC,除非这样做是安全的。在服务器流式 RPC 的情况下,这样做通常不安全,因此您需要直接在客户端上重试 RPC。


查看完整回答
反对 回复 2022-07-14
  • 1 回答
  • 0 关注
  • 212 浏览
慕课专栏
更多

添加回答

举报

0/150
提交
取消
微信客服

购课补贴
联系客服咨询优惠详情

帮助反馈 APP下载

慕课网APP
您的移动学习伙伴

公众号

扫描二维码
关注慕课网微信公众号