package main
import (
"fmt"
"strings"
"time"
"os"
"bufio"
"io"
)
// interface
type Reader interface {
Read(rc chan string)
}
type Writer interface {
Write(wc chan string)
}
type LogProcess struct {
rc chan string // in file get message
wc chan string // out message in writer
read Reader
write Writer
}
func (l *LogProcess) Process() {
for v := range l.rc {
l.wc <- strings.ToUpper(string(v))
}
}
type ReadFromFile struct {
path string // file storage path
}
/*
1. 读取模块
a. 打开文件
b. 从文件 末尾 开始逐行读取
*/
func (r *ReadFromFile) Read(rc chan string){
// line := "string in message"
file, err := os.Open(r.path)
if err != nil {
panic(fmt.Sprintf("open file error: %s", err.Error()))
}
defer file.Close()
// 从文件末尾开始逐行读取
file.Seek(0, 2)
rd := bufio.NewReader(file)
for {
// why is not use method rd.ReadString()
line, err := rd.ReadString('\n')
if err != nil {
if err == io.EOF {
time.Sleep(500 * time.Microsecond)
continue
}
panic(fmt.Sprintf("ReadString error: %s", err.Error()))
}
//rc <- line[:len(line)-1]
rc <- strings.TrimSpace(line)
}
}
type WriterToinfluxDB struct {
influxDBDsn string // influxDB dsn
}
func (w *WriterToinfluxDB) Write(wc chan string) {
for v := range wc {
fmt.Println(v)
}
}
func main() {
r := &ReadFromFile{
path: "/tmp/access.log",
}
w := &WriterToinfluxDB{
influxDBDsn: "username$password",
}
lp := &LogProcess{
rc: make(chan string),
wc: make(chan string),
read: r,
write: w,
}
go lp.read.Read(lp.rc)
go lp.Process()
go lp.write.Write(lp.wc)
time.Sleep( 30 * time.Second)
}