我正在编写一个程序,该程序从 CLI 命令读取 stderr 并通过 gRPC 流将 stderr 日志流式传输到客户端。cmd 实例化如下(CLI 命令需要配置,我将其作为 stdin 传递):ctxTimeout, cancel := context.WithTimeout(context.Background(), time.Duration(t)*time.Second)defer cancel()cmd := exec.CommandContext(ctxTimeout, "java", "-jar", "/opt/myapp/myapp.jar", "scan", "-config", "-",)cmd.Stdin = config我使用两个单独的缓冲区:一个将 stderr“实时”流式传输到客户端,另一个将日志保存在数据库中。为此,我使用 io.MultiWriter 并将其映射到 cmd 标准输入:bufStream := bytes.NewBuffer(make([]byte, 0, 4096))bufPersist := new(bytes.Buffer)stderr := io.MultiWriter(bufStream, bufPersist)// Map the command Standard Error Output to the multiwritercmd.Stderr = stderr最后,在启动命令之前,我有一个 goroutine,它使用 bufio.Scanner 来读取 stderr 缓冲区并通过 gRPC 逐行流式传输:// Go Routine to stream the scan job logsgo func() { for { select { case <-done: return default: scanner := bufio.NewScanner(bufStream) for scanner.Scan() { time.Sleep(3 * time.Second) logging.MyAppLog("warning", "%v", scanner.Text()) _ = stream.Send(&agentpb.ScanResultsResponse{ ScanLogsWebsocket: &agentpb.ScanLogFileResponseWB{ScanLogs: scanner.Bytes()}, }, ) } } }}()err := cmd.Run()done <- true我的问题是我必须time.sleep(time.Seconds * 3)在 goroutine 中使用才能获得正确的输出。如果没有,我得到的输出顺序不正确并被截断。我相信这是由于 io.multiwriter 和 bufio.scanner 不“同步”,但我想要一些有关最佳方法的指导。提前致谢。
1 回答
慕雪6442864
TA贡献1812条经验 获得超5个赞
来自扫描仪文档:
Bytes 返回调用 Scan 生成的最新令牌。底层数组可能指向将被后续调用 Scan 覆盖的数据。它不进行分配。
gRPC 有自己的缓冲。这意味着当 Send 返回时,字节不一定已写入线路,并且下一个 Scan 调用会修改尚未写入的字节。
复制 Scan 返回的字节,应该没问题:
for scanner.Scan() {
b := append([]byte(nil), scanner.Bytes()...)
stream.Send(&agentpb.ScanResultsResponse{
ScanLogsWebsocket: &agentpb.ScanLogFileResponseWB{
ScanLogs: b,
},
})
}
- 1 回答
- 0 关注
- 107 浏览
添加回答
举报
0/150
提交
取消