package main
import (
"fmt"
"strings"
"time"
"os"
"bufio"
"io"
"regexp"
"log"
"strconv"
)
// interface
type Reader interface {
Read(rc chan string)
}
type Writer interface {
Write(wc chan interface{})
}
type LogProcess struct {
rc chan string // in file get message
wc chan interface{} // out message in writer
read Reader
write Writer
}
type Message struct {
IP string
Logtime time.Time
Url string
Code int
Length float64
Refer string
Client string
}
/*
1. 从 channel 中读取每行日志数据
2. 正则提取所需的数据
3. 写入到 writer channel
*/
func (l *LogProcess) Process() {
/*
nginx log format
192.168.252.210 - - [03/Nov/2016:16:56:47 +0800] "POST /jsrpc.php?output=json-rpc HTTP/1.1" 200 149 "http://ip:port/zabbix.php?action=dashboard.view" "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36(KHTML, like Gecko) Maxthon/4.9.3.1000 Chrome/39.0.2146.0 Safari/537.36"
grok format
(?m)(?<ip>[\d+.]+)\s+(?<drop>[^\[]+)\s+\[(?<logtime>[^\]]+)\]\s+\"(?<url>[^"]+)\"\s+(?<code>\d+)\s+(?<length>\d+)\s+\"(?<refer>[^"]+)\"\s+\"(?<client>[^"]+)
*/
// 预格式化
rex := regexp.MustCompile(`([\d+.]+)\s+([^\[]+)\s+\[([^\]]+)\]\s+\"([^"]+)\"\s+(\d+)\s+(\d+)\s+\"([^"]+)\"\s+\"([^"]+)`)
// setting time zone
loc, _ := time.LoadLocation("Asia/Shanghai")
for v := range l.rc {
// 利用正则分组的原则,将提取的字段分组
ret := rex.FindStringSubmatch(v)
if len(ret) != 9 {
log.Println("match is error", v)
continue
}
//init message struct
message := &Message{}
// log fromat is 03/Nov/2016:16:56:47 +0800
// RFC1123Z = "Mon, 02 Jan 2006 15:04:05 -0700" // RFC1123 with numeric zone
// 我在这里必须要使用这个时间戳,RFC1123Z 格式 不然出错信息如下
// 2018/07/19 21:00:09 ParseInLocation fail: parsing time "03/Nov/2016:16:56:47 +0800" as "02/Jan/2006:15:04:05 +0000": cannot parse "800" as " +0000" 03/Nov/2016:16:56:47 +0800
//
t, err := time.ParseInLocation("02/Jan/2006:15:04:05 -0700", ret[3], loc)
if err != nil {
log.Println("ParseInLocation fail:", err.Error(), ret[3])
}
message.Logtime = t
message.IP = ret[1]
// 这里偷懒了
message.Code, _ = strconv.Atoi(ret[5])
message.Length, _ = strconv.ParseFloat(ret[6], 64)
message.Client = ret[8]
message.Refer = ret[7]
message.Url = ret[4]
l.wc <- message
}
}
type ReadFromFile struct {
path string // file storage path
}
/*
1. 读取模块
a. 打开文件
b. 从文件 末尾 开始逐行读取
*/
func (r *ReadFromFile) Read(rc chan string){
// line := "string in message"
file, err := os.Open(r.path)
if err != nil {
panic(fmt.Sprintf("open file error: %s", err.Error()))
}
defer file.Close()
// 从文件末尾开始逐行读取
file.Seek(0, 2)
rd := bufio.NewReader(file)
for {
// why is not use method rd.ReadString()
line, err := rd.ReadString('\n')
if err != nil {
if err == io.EOF {
time.Sleep(500 * time.Microsecond)
continue
}
panic(fmt.Sprintf("ReadString error: %s", err.Error()))
}
//rc <- line[:len(line)-1]
rc <- strings.TrimSpace(line)
}
}
type WriterToinfluxDB struct {
influxDBDsn string // influxDB dsn
}
func (w *WriterToinfluxDB) Write(wc chan interface{}) {
for v := range wc {
fmt.Println(v)
}
}
func main() {
r := &ReadFromFile{
path: "/tmp/access.log",
}
w := &WriterToinfluxDB{
influxDBDsn: "username$password",
}
lp := &LogProcess{
rc: make(chan string),
wc: make(chan interface{}),
read: r,
write: w,
}
go lp.read.Read(lp.rc)
go lp.Process()
go lp.write.Write(lp.wc)
time.Sleep( 30 * time.Second)
}