-
Golang 并发实现
gorountine channels select
查看全部 -
常见并发模型
1,进程和线程模型,例如apache
2,异步非阻塞,例如nginx,nodejs
3,协程,例如golang,erlang,lua
查看全部 -
package main import ( "strings" "fmt" "time" "os" "bufio" "io" "regexp" "log" "strconv" "net/url" "github.com/influxdata/influxdb/client/v2" "flag" "net/http" "encoding/json" ) type Reader interface { Read(rc chan []byte) } type Writer interface { Write(wc chan *Message) } type ReadFromFile struct { path string //log file path } type WriteToInfluxDB struct { influxDBDsn string //influx data source } type Message struct { TimeLocal time.Time BytesSent int Path, Method, Scheme, Status string UpstreamTime, RequestTime float64 } type LogProcess struct { rc chan []byte //read channel wc chan *Message //write channel read Reader write Writer } type SystemInfo struct { HandleLine int `json:"handleLine"` //总处理日志行数 Tps float64 `json:"tps"` //系统吞吐量 ReadChanlen int `json:"readChanlen"` //读信道长度 WriteChanlen int `json:"writeChanlen"` //写信道长度 RunTime string `json:"runTime"` //总运行时间 ErrNum int `json:"errNum"` //错误数 } type Monitor struct { startTime time.Time data SystemInfo tpSli []int } const ( TypeHandleLine = 0 TypeErrNum = 1 ) var TypeMonitorChan = make(chan int, 200) func (m *Monitor) start(lp *LogProcess) { go func() { for n := range TypeMonitorChan{ switch n { case TypeErrNum: m.data.ErrNum += 1 case TypeHandleLine: m.data.HandleLine += 1 } } }() ticker := time.NewTicker(5*time.Second) go func() { for { <-ticker.C m.tpSli = append(m.tpSli, m.data.HandleLine) if len(m.tpSli) > 2 { m.tpSli = m.tpSli[1:] } } }() http.HandleFunc("/monitor", func(writer http.ResponseWriter, request *http.Request) { m.data.RunTime = time.Now().Sub(m.startTime).String() m.data.ReadChanlen = len(lp.rc) m.data.WriteChanlen = len(lp.wc) if len(m.tpSli) >= 2 { m.data.Tps = float64(m.tpSli[1]-m.tpSli[0]) / 5 } ret, _ := json.MarshalIndent(m.data, "", "\t") io.WriteString(writer, string(ret)) }) http.ListenAndServe(":9193", nil) } func (r *ReadFromFile) Read(rc chan []byte) { //log reading module //打开文件 f, err := os.Open(r.path) if err!= nil { TypeMonitorChan <- TypeErrNum panic(fmt.Sprintf("open file error:%s", err.Error())) } //从文件末尾开始逐行读取文件内容 f.Seek(0, 2) rd := bufio.NewReader(f) for { line, err := rd.ReadBytes('\n') if err == io.EOF { time.Sleep(500*time.Millisecond) continue } else if err != nil { TypeMonitorChan <- TypeErrNum panic(fmt.Sprintf("ReadBytes error:%s", err.Error())) } TypeMonitorChan <- TypeHandleLine rc <- line[:len(line)-1] } } func (w *WriteToInfluxDB) Write(wc chan *Message) { //log writing module infSli := strings.Split(w.influxDBDsn, "@") // Create a new HTTPClient c, err := client.NewHTTPClient(client.HTTPConfig{ Addr: infSli[0], Username: infSli[1], Password: infSli[2], }) if err != nil { TypeMonitorChan <- TypeErrNum log.Fatal(err) } defer c.Close() // Create a new point batch bp, err := client.NewBatchPoints(client.BatchPointsConfig{ Database: infSli[3], Precision: infSli[4], }) if err != nil { TypeMonitorChan <- TypeErrNum log.Fatal(err) } for v := range wc { // Create a point and add to batch tags := map[string]string{"Path": v.Path, "Method": v.Method, "Scheme": v.Scheme, "Status": v.Status} fields := map[string]interface{}{ "UpstreamTime": v.UpstreamTime, "RequestTime": v.RequestTime, "BytesSent": v.BytesSent, } pt, err := client.NewPoint("nginx_log", tags, fields, v.TimeLocal) if err != nil { log.Fatal(err) } bp.AddPoint(pt) // Write the batch if err := c.Write(bp); err != nil { TypeMonitorChan <- TypeErrNum log.Fatal(err) } // Close client resources if err := c.Close(); err != nil { TypeMonitorChan <- TypeErrNum log.Fatal(err) } log.Println("Write success") } } func (l *LogProcess) Process() { //log parsing module r := regexp.MustCompile(`([\d\.]+)\s+([^ \[]+)\s+([^ \[]+)\s+\[([^\]]+)\]\s+\"([^"]+)\"\s+(\d{3})\s+(\d+)\s+\"([^"]+)\"\s+\"(.*?)\"\s+\"([\d\.-]+)\"\s+([\d\.-]+)\s+([\d\.-]+)`) loc, _ := time.LoadLocation("Asia/Shanghai") for v := range l.rc { ret := r.FindStringSubmatch(string(v)) if len(ret) != 13 { TypeMonitorChan <- TypeErrNum log.Println("FindStringSubMatch fail:", string(v)) continue } message := &Message{} t, err := time.ParseInLocation("02/Jan/2006:15:04:05 +0800", ret[4], loc) if err != nil { TypeMonitorChan <- TypeErrNum log.Println("ParseInLocation fail", err.Error(), ret[4]) continue } message.TimeLocal = t byteSent, _ := strconv.Atoi(ret[7]) message.BytesSent = byteSent reqSli := strings.Split(ret[5], " ") if len(reqSli) != 3 { TypeMonitorChan <- TypeErrNum log.Println("strings.Split fail", ret[5]) continue } message.Method = reqSli[0] u, err := url.Parse(reqSli[1]) if err != nil { log.Println("url parse fail", err) continue } message.Path = u.Path //message.Scheme = ret[5] message.Status = ret[6] upstreamTime, _ := strconv.ParseFloat(ret[11], 64) requestTime, _ := strconv.ParseFloat(ret[12], 64) message.UpstreamTime = upstreamTime message.RequestTime = requestTime l.wc <- message } //127.0.0.1 - - [14/May/2018:14:52:45 +0800] "GET /foo?query=t HTTP/1.1" 404 27 "-" "KeepAliveClient" "-" 1.005 1.854 //([\d\.]+)\s+([^ \[]+)\s+([^ \[]+)\s+\[([^\]]+)\]\s+([a-z]+)\s+\"([^"]+)\"\s+(\d{3})\s+(\d+)\s+\"([^"]+)\"\s+\"(.*?)\"\s+\"([\d\.-]+)\"\s+([\d\.-]+)\s+([\d\.-]+) } func main() { var path, influxDsn string flag.StringVar(&path, "path", "/usr/local/var/log/nginx/sd-mac-access-8081.log", "read file path") flag.StringVar(&influxDsn, "influxDsn", "http://127.0.0.1:8086@wangxu@wangxu26@imooc@s", "influx data source") flag.Parse() r := &ReadFromFile{ path:path, } w := &WriteToInfluxDB{ influxDBDsn:influxDsn, } lp := &LogProcess{ //使用带缓存的channel,防止阻塞 rc:make(chan []byte, 200), wc:make(chan *Message, 200), read:r, write:w, } //开两个读goroutines for i := 0; i < 2; i++ { go lp.read.Read(lp.rc) } go lp.Process() //读比写要快,多开几个写的goroutines for i := 0; i < 4; i++ { go lp.write.Write(lp.wc) } //time.Sleep(30*time.Second) m := Monitor{ startTime:time.Now(), data:SystemInfo{}, } m.start(lp) }
查看全部 -
常见的并发模型
1,进程和线程模型,例如apache
2,异步非阻塞,例如nginx,nodejs
3,协程,例如golang,erlang,lua
查看全部 -
golang分析nginx日志
使用influxdb存储
使用grafana展示
查看全部 -
任务的合理的拆分
查看全部 -
Influxdb关键概念
查看全部 -
Influxdb简介
查看全部 -
日志监控系统——写入模块
查看全部 -
日志监控系统——解析模块
查看全部 -
日志监控系统——读取模块的实现
查看全部 -
golang中的面向对象
查看全部 -
并发与并行
查看全部 -
golang并发实现——select
查看全部 -
gplang并发实现——channels
查看全部
举报
0/150
提交
取消