为了账号安全,请及时绑定邮箱和手机立即绑定

Golang:同时处理 5 个大文件

Golang:同时处理 5 个大文件

Go
烙印99 2021-11-29 16:46:37
我目前在 Perl 中处理了 5 个巨大的(每个 400 万行)日志文件,我想我可以尝试在 Go 及其并发功能中实现相同的功能。因此,由于对 Go 非常缺乏经验,我正在考虑按以下方式进行操作。对这种方法的任何评论将不胜感激。一些粗略的伪代码:var wg1 sync.WaitGroupvar wg2 sync.WaitGroupfunc processRow (r Row) {    wg2.Add(1)    defer wg2.Done()    res = <process r>    return res}func processFile(f File) {    wg1.Add(1)    open(newfile File)    defer wg1.Done()    line = <row from f>    result = go processRow(line)    newFile.Println(result) // Write new processed line to newFile    wg2.Wait()    newFile.Close()}func main() {    for each f logfile {        go processFile(f)    }    wg1.Wait()}所以,想法是我同时处理这 5 个文件,然后每个文件的所有行也将依次同时处理。那行得通吗?
查看完整描述

1 回答

?
江户川乱折腾

TA贡献1851条经验 获得超5个赞

您绝对应该使用渠道来管理您处理过的行。或者,您也可以编写另一个 goroutine 来处理您的输出。


var numGoWriters = 10


func processRow(r Row, ch chan<- string) {

    res := process(r)

    ch <- res

}


func writeRow(f File, ch <-chan string) {

    w := bufio.NewWriter(f)

    for s := range ch {

        _, err := w.WriteString(s + "\n")

    }


func processFile(f File) {

    outFile, err := os.Create("/path/to/file.out")

    if err != nil {

        // handle it

    }

    defer outFile.Close()

    var wg sync.WaitGroup

    ch := make(chan string, 10)  // play with this number for performance

    defer close(ch) // once we're done processing rows, we close the channel

                    // so our worker threads exit

    fScanner := bufio.NewScanner(f)

    for fScanner.Scan() {

        wg.Add(1)

        go func() {

            processRow(fScanner.Text(), ch)

            wg.Done()

        }()

    }

    for i := 0; i < numGoWriters; i++ {

        go writeRow(outFile, ch)

    }

    wg.Wait()  

}

在这里,我们已经完成processRow了所有的处理(我假设string),完成writeRow了所有的 I/O,processFile并将每个文件绑定在一起。然后所有main要做的就是移交文件,产生 goroutines,等等。


func main() {

    var wg sync.WaitGroup


    filenames := [...]string{"here", "are", "some", "log", "paths"}

    for fname := range filenames {

        inFile, err := os.Open(fname)

        if err != nil {

            // handle it

        }

        defer inFile.Close()

        wg.Add(1)

        go processFile(inFile)

    }

    wg.Wait()


查看完整回答
反对 回复 2021-11-29
  • 1 回答
  • 0 关注
  • 210 浏览
慕课专栏
更多

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信