1 回答
TA贡献1851条经验 获得超5个赞
您绝对应该使用渠道来管理您处理过的行。或者,您也可以编写另一个 goroutine 来处理您的输出。
var numGoWriters = 10
func processRow(r Row, ch chan<- string) {
res := process(r)
ch <- res
}
func writeRow(f File, ch <-chan string) {
w := bufio.NewWriter(f)
for s := range ch {
_, err := w.WriteString(s + "\n")
}
func processFile(f File) {
outFile, err := os.Create("/path/to/file.out")
if err != nil {
// handle it
}
defer outFile.Close()
var wg sync.WaitGroup
ch := make(chan string, 10) // play with this number for performance
defer close(ch) // once we're done processing rows, we close the channel
// so our worker threads exit
fScanner := bufio.NewScanner(f)
for fScanner.Scan() {
wg.Add(1)
go func() {
processRow(fScanner.Text(), ch)
wg.Done()
}()
}
for i := 0; i < numGoWriters; i++ {
go writeRow(outFile, ch)
}
wg.Wait()
}
在这里,我们已经完成processRow了所有的处理(我假设string),完成writeRow了所有的 I/O,processFile并将每个文件绑定在一起。然后所有main要做的就是移交文件,产生 goroutines,等等。
func main() {
var wg sync.WaitGroup
filenames := [...]string{"here", "are", "some", "log", "paths"}
for fname := range filenames {
inFile, err := os.Open(fname)
if err != nil {
// handle it
}
defer inFile.Close()
wg.Add(1)
go processFile(inFile)
}
wg.Wait()
- 1 回答
- 0 关注
- 210 浏览
添加回答
举报