1 回答
TA贡献1834条经验 获得超8个赞
只要只有工作人员发起连接,您就不必担心 NAT。gRPC 支持任一方向(或两者)的流式传输。这意味着您的所有需求都可以仅使用调度程序上的一台服务器来实现;调度程序不需要连接回工作人员。
根据您的描述,您的服务可能如下所示:
syntax = "proto3";
import "google/protobuf/empty.proto";
service Scheduler {
rpc GetJobs(GetJobsRequest) returns (stream GetJobsResponse) {}
rpc ReportWorkerStatus(stream ReportWorkerStatusRequest) returns (google.protobuf.Empty) {}
rpc ReportJobStatus(stream JobStatus) returns (stream JobAction) {}
}
enum JobType {
JOB_TYPE_UNSPECIFIED = 0;
JOB_TYPE_CONSOLE = 1;
JOB_TYPE_EXEC = 2;
}
message GetJobsRequest {
// List of job types this worker is willing to accept.
repeated JobType types = 1;
}
message GetJobsResponse {
string jobId = 0;
JobType type = 1;
string fileName = 2;
bytes fileContent = 3;
// etc.
}
message ReportWorkerStatusRequest {
float cpuLoad = 0;
uint64 availableDiskSpace = 1;
uint64 availableMemory = 2;
// etc.
// List of filenames or file hashes, or whatever else you need to precisely
// report the presence of files.
repeated string haveFiles = 2;
}
其中大部分是偏好问题(oneof例如,您可以使用而不是枚举),但希望从客户端到服务器的单个连接足以满足您的要求。
维护一组可用的工作人员非常简单:
func (s *Server) GetJobs(req *pb.GetJobRequest, stream pb.Scheduler_GetJobsServer) error {
ctx := stream.Context()
s.scheduler.AddWorker(req)
defer s.scheduler.RemoveWorker(req)
for {
job, err := s.scheduler.GetJob(ctx, req)
switch {
case ctx.Err() != nil: // client disconnected
return nil
case err != nil:
return err
}
if err := stream.Send(job); err != nil {
return err
}
}
}
基础教程包括所有类型流的示例,包括 Go 中的服务器和客户端实现。
至于注册,这通常只是意味着创建某种凭证,供工作人员在与服务器通信时使用。这可能是一个随机生成的令牌(服务器可以使用它来加载关联的元数据),或者用户名/密码组合,或者 TLS 客户端证书,或类似的。详细信息将取决于您的基础设施和设置工作人员时所需的工作流程。
- 1 回答
- 0 关注
- 78 浏览
添加回答
举报