我正在尝试使用 grpc 实现 pub sub 模式,但我对如何正确执行它有点困惑。我的原型: rpc call (google.protobuf.Empty) returns (stream Data);客户:asynStub.call(Empty.getDefaultInstance(), new StreamObserver<Data>() { @Override public void onNext(Data value) { // process a data @Override public void onError(Throwable t) { } @Override public void onCompleted() { } }); } catch (StatusRuntimeException e) { LOG.warn("RPC failed: {}", e.getStatus()); } Thread.currentThread().join();服务器服务:public class Sender extends DataServiceGrpc.DataServiceImplBase implements Runnable { private final BlockingQueue<Data> queue; private final static HashSet<StreamObserver<Data>> observers = new LinkedHashSet<>(); public Sender(BlockingQueue<Data> queue) { this.queue = queue; } @Override public void data(Empty request, StreamObserver<Data> responseObserver) { observers.add(responseObserver); } @Override public void run() { while (!Thread.currentThread().isInterrupted()) { try { // waiting for first element Data data = queue.take(); // send head element observers.forEach(o -> o.onNext(data)); } catch (InterruptedException e) { LOG.error("error: ", e); Thread.currentThread().interrupt(); } } }}如何正确地从全局观察者中删除客户?连接断开时如何接收某种信号?如何管理客户端-服务器重新连接?连接断开时如何强制客户端重新连接?
1 回答

慕虎7371278
TA贡献1802条经验 获得超4个赞
在实施您的服务时:
@Override
public void data(Empty request, StreamObserver<Data> responseObserver) {
observers.add(responseObserver);
}
您需要获取当前请求的上下文,并监听取消。对于单请求、多响应调用(又名服务器流),gRPC 生成的代码被简化为直接传递请求。这意味着您无法直接访问底层ServerCall.Listener,这就是您通常如何监听客户端断开和取消的方式。
相反,每个 gRPC 调用都有一个Context与之关联的,它携带取消和其他请求范围的信号。对于您的情况,您只需要通过添加自己的侦听器来侦听取消,然后从链接的哈希集中安全地删除响应观察器。
至于重新连接:如果连接断开,gRPC 客户端会自动重新连接,但通常不会重试 RPC,除非这样做是安全的。在服务器流式 RPC 的情况下,这样做通常不安全,因此您需要直接在客户端上重试 RPC。
添加回答
举报
0/150
提交
取消