为了账号安全,请及时绑定邮箱和手机立即绑定

Go并发编程案例解析

麦可同学 全栈工程师
难度中级
时长 1小时58分
学习人数
综合评分9.40
40人评价 查看评价
9.5 内容实用
9.1 简洁易懂
9.6 逻辑清晰
  • Golang 并发实现

    gorountine channels select

    查看全部
  • 常见并发模型

    1,进程和线程模型,例如apache

    2,异步非阻塞,例如nginx,nodejs

    3,协程,例如golang,erlang,lua


    查看全部
  • package main
    
    import (
       "strings"
       "fmt"
       "time"
       "os"
       "bufio"
       "io"
       "regexp"
       "log"
       "strconv"
       "net/url"
       "github.com/influxdata/influxdb/client/v2"
       "flag"
       "net/http"
       "encoding/json"
    )
    
    type Reader interface {
       Read(rc chan []byte)
    }
    
    type Writer interface {
       Write(wc chan *Message)
    }
    
    type ReadFromFile struct {
       path string //log file path
    }
    
    type WriteToInfluxDB struct {
       influxDBDsn string //influx data source
    }
    
    type Message struct {
       TimeLocal time.Time
       BytesSent int
       Path, Method, Scheme, Status string
       UpstreamTime, RequestTime float64
    }
    
    type LogProcess struct {
       rc chan []byte //read channel
       wc chan *Message //write channel
       read Reader
       write Writer
    }
    
    type SystemInfo struct {
       HandleLine int `json:"handleLine"` //总处理日志行数
       Tps float64 `json:"tps"` //系统吞吐量
       ReadChanlen int `json:"readChanlen"` //读信道长度
       WriteChanlen int `json:"writeChanlen"` //写信道长度
       RunTime string `json:"runTime"` //总运行时间
       ErrNum int `json:"errNum"` //错误数
    }
    
    type Monitor struct {
       startTime time.Time
       data SystemInfo
       tpSli []int
    }
    
    const (
       TypeHandleLine = 0
       TypeErrNum = 1
    )
    
    var TypeMonitorChan = make(chan int, 200)
    
    func (m *Monitor) start(lp *LogProcess) {
       go func() {
          for n := range TypeMonitorChan{
             switch n {
             case TypeErrNum:
                m.data.ErrNum += 1
             case TypeHandleLine:
                m.data.HandleLine += 1
             }
          }
       }()
    
       ticker := time.NewTicker(5*time.Second)
       go func() {
          for {
             <-ticker.C
             m.tpSli = append(m.tpSli, m.data.HandleLine)
             if len(m.tpSli) > 2 {
                m.tpSli = m.tpSli[1:]
             }
          }
       }()
    
       http.HandleFunc("/monitor", func(writer http.ResponseWriter, request *http.Request) {
          m.data.RunTime = time.Now().Sub(m.startTime).String()
          m.data.ReadChanlen = len(lp.rc)
          m.data.WriteChanlen = len(lp.wc)
          if len(m.tpSli) >= 2 {
             m.data.Tps = float64(m.tpSli[1]-m.tpSli[0]) / 5
          }
    
          ret, _ := json.MarshalIndent(m.data, "", "\t")
          io.WriteString(writer, string(ret))
       })
    
       http.ListenAndServe(":9193", nil)
    }
    
    func (r *ReadFromFile) Read(rc chan []byte) {
       //log reading module
    
       //打开文件
       f, err := os.Open(r.path)
       if err!= nil {
          TypeMonitorChan <- TypeErrNum
          panic(fmt.Sprintf("open file error:%s", err.Error()))
       }
    
       //从文件末尾开始逐行读取文件内容
       f.Seek(0, 2)
       rd := bufio.NewReader(f)
    
       for {
          line, err := rd.ReadBytes('\n')
          if err == io.EOF {
             time.Sleep(500*time.Millisecond)
             continue
          } else if err != nil {
             TypeMonitorChan <- TypeErrNum
             panic(fmt.Sprintf("ReadBytes error:%s", err.Error()))
          }
          TypeMonitorChan <- TypeHandleLine
          rc <- line[:len(line)-1]
       }
    }
    
    func (w *WriteToInfluxDB) Write(wc chan *Message) {
       //log writing module
    
       infSli := strings.Split(w.influxDBDsn, "@")
    
       // Create a new HTTPClient
       c, err := client.NewHTTPClient(client.HTTPConfig{
          Addr:     infSli[0],
          Username: infSli[1],
          Password: infSli[2],
       })
       if err != nil {
          TypeMonitorChan <- TypeErrNum
          log.Fatal(err)
       }
       defer c.Close()
    
       // Create a new point batch
       bp, err := client.NewBatchPoints(client.BatchPointsConfig{
          Database:  infSli[3],
          Precision: infSli[4],
       })
       if err != nil {
          TypeMonitorChan <- TypeErrNum
          log.Fatal(err)
       }
    
       for v := range wc {
          // Create a point and add to batch
          tags := map[string]string{"Path": v.Path, "Method": v.Method, "Scheme": v.Scheme, "Status": v.Status}
          fields := map[string]interface{}{
             "UpstreamTime": v.UpstreamTime,
             "RequestTime": v.RequestTime,
             "BytesSent": v.BytesSent,
          }
    
          pt, err := client.NewPoint("nginx_log", tags, fields, v.TimeLocal)
          if err != nil {
             log.Fatal(err)
          }
          bp.AddPoint(pt)
    
          // Write the batch
          if err := c.Write(bp); err != nil {
             TypeMonitorChan <- TypeErrNum
             log.Fatal(err)
          }
    
          // Close client resources
          if err := c.Close(); err != nil {
             TypeMonitorChan <- TypeErrNum
             log.Fatal(err)
          }
          log.Println("Write success")
       }
    }
    
    func (l *LogProcess) Process() {
       //log parsing module
    
       r := regexp.MustCompile(`([\d\.]+)\s+([^ \[]+)\s+([^ \[]+)\s+\[([^\]]+)\]\s+\"([^"]+)\"\s+(\d{3})\s+(\d+)\s+\"([^"]+)\"\s+\"(.*?)\"\s+\"([\d\.-]+)\"\s+([\d\.-]+)\s+([\d\.-]+)`)
       loc, _ := time.LoadLocation("Asia/Shanghai")
       for v := range l.rc {
          ret := r.FindStringSubmatch(string(v))
          if len(ret) != 13 {
             TypeMonitorChan <- TypeErrNum
             log.Println("FindStringSubMatch fail:", string(v))
             continue
          }
          message := &Message{}
          t, err := time.ParseInLocation("02/Jan/2006:15:04:05 +0800", ret[4], loc)
          if err != nil {
             TypeMonitorChan <- TypeErrNum
             log.Println("ParseInLocation fail", err.Error(), ret[4])
             continue
          }
    
          message.TimeLocal = t
    
          byteSent, _ := strconv.Atoi(ret[7])
          message.BytesSent = byteSent
    
          reqSli := strings.Split(ret[5], " ")
          if len(reqSli) != 3 {
             TypeMonitorChan <- TypeErrNum
             log.Println("strings.Split fail", ret[5])
             continue
          }
          message.Method = reqSli[0]
    
          u, err := url.Parse(reqSli[1])
          if err != nil {
             log.Println("url parse fail", err)
             continue
          }
          message.Path = u.Path
    
          //message.Scheme = ret[5]
          message.Status = ret[6]
    
          upstreamTime, _ := strconv.ParseFloat(ret[11], 64)
          requestTime, _ := strconv.ParseFloat(ret[12], 64)
          message.UpstreamTime = upstreamTime
          message.RequestTime = requestTime
          l.wc <- message
       }
    
       //127.0.0.1 - - [14/May/2018:14:52:45 +0800] "GET /foo?query=t HTTP/1.1" 404 27 "-" "KeepAliveClient" "-" 1.005 1.854
       //([\d\.]+)\s+([^ \[]+)\s+([^ \[]+)\s+\[([^\]]+)\]\s+([a-z]+)\s+\"([^"]+)\"\s+(\d{3})\s+(\d+)\s+\"([^"]+)\"\s+\"(.*?)\"\s+\"([\d\.-]+)\"\s+([\d\.-]+)\s+([\d\.-]+)
    }
    
    func main() {
       var path, influxDsn string
       flag.StringVar(&path, "path", "/usr/local/var/log/nginx/sd-mac-access-8081.log", "read file path")
       flag.StringVar(&influxDsn, "influxDsn", "http://127.0.0.1:8086@wangxu@wangxu26@imooc@s", "influx data source")
       flag.Parse()
    
       r := &ReadFromFile{
          path:path,
       }
    
       w := &WriteToInfluxDB{
          influxDBDsn:influxDsn,
       }
    
       lp := &LogProcess{
          //使用带缓存的channel,防止阻塞
          rc:make(chan []byte, 200),
          wc:make(chan *Message, 200),
          read:r,
          write:w,
       }
    
       //开两个读goroutines
       for i := 0; i < 2; i++ {
          go lp.read.Read(lp.rc)
       }
    
       go lp.Process()
    
       //读比写要快,多开几个写的goroutines
       for i := 0; i < 4; i++ {
          go lp.write.Write(lp.wc)
       }
    
       //time.Sleep(30*time.Second)
       m := Monitor{
          startTime:time.Now(),
          data:SystemInfo{},
       }
       m.start(lp)
    }


    查看全部
  • 常见的并发模型

    1,进程和线程模型,例如apache

    2,异步非阻塞,例如nginx,nodejs

    3,协程,例如golang,erlang,lua

    查看全部
  • golang分析nginx日志

    使用influxdb存储

    使用grafana展示

    查看全部
    0 采集 收起 来源:课程介绍

    2018-10-22

  • 任务的合理的拆分

    查看全部
  • Influxdb关键概念

    查看全部
  • Influxdb简介

    查看全部
  • 日志监控系统——写入模块

    查看全部
  • 日志监控系统——解析模块

    查看全部
  • 日志监控系统——读取模块的实现

    查看全部
    0 采集 收起 来源:读取模块实现

    2018-10-04

  • golang中的面向对象

    查看全部
  • 并发与并行

    查看全部
  • golang并发实现——select

    查看全部
  • gplang并发实现——channels

    查看全部

举报

0/150
提交
取消
课程须知
1、课程难度属于中级 2、有一定的编程经验,了解Golang基本语法
老师告诉你能学到什么?
1、常见的并发编程模型 2、并发与并行的异同 3、Golang的面向对象 4、Golang并发编程知识和设计 5、实现一个简单的日志监控程序

微信扫码,参与3人拼团

意见反馈 帮助中心 APP下载
官方微信
友情提示:

您好,此课程属于迁移课程,您已购买该课程,无需重复购买,感谢您对慕课网的支持!