为了账号安全,请及时绑定邮箱和手机立即绑定

io.Pipe() 导致 WaitGroup 卡住

io.Pipe() 导致 WaitGroup 卡住

Go
九州编程 2022-12-19 10:52:32
我正在处理一个巨大的数据文件,大约是。100 GB。这个巨大文件中的每一行都是一段 JSON 数据,我想读取、压缩这些数据并将其存储在内存数据库中。var wg sync.WaitGroupfor {    line, err := reader.ReadString('\n')    if err != nil {        break    }    go func(index int) {        wg.Add(1)        pr, pw := io.Pipe()        zw := lzw.NewWriter(pw, lzw.LSB, 8)        _, err := io.Copy(zw, strings.NewReader(line))        pw.Close()        zw.Close()        if err != nil {            fmt.Println(err.Error())        }        b, err := io.ReadAll(pr)        if err != nil {            fmt.Println(err.Error())        }        client.Set(ctx, fmt.Sprintf("%d", index), base64.StdEncoding.EncodeToString(b), time.Hour*1000)        pr.Close()        wg.Done()    }(index)    if index%10000 == 0 {        fmt.Println(index)        wg.Wait()    }    index += 1}但是,此代码在处理前 10000 行后停止。当我向下移动时wg.Add(1),zw.Close()它继续处理该行的其余部分(但变得不稳定)。如果没有lzw,io.Pipe()当我尝试以未压缩的方式存储确切的值时,一切都可以正常工作。我不确定我是否没有WaitGroup正确使用 the 还是有一些io.Pipe()我还不知道的与 the 相关的东西。
查看完整描述

1 回答

?
心有法竹

TA贡献1866条经验 获得超5个赞

1-删除pr, pw := io.Pipe()使代码更简单,因为它是多余的,

试试这个:


line, err := reader.ReadString('\n')

if err == io.EOF {

    wg.Wait()

    break

}

if err != nil {

    log.Fatal(err)

}

wg.Add(1)

go func(index int) {

    var buf bytes.Buffer

    { // lexical scoping (static scoping)

        zw := lzw.NewWriter(&buf, lzw.LSB, 8)

        n, err := zw.Write([]byte(line)) // n, err := io.Copy(zw, strings.NewReader(line))

        if err != nil {

            log.Fatal(err)

        }

        if int(n) != len(line) {

            log.Fatal(n, len(line))

        }

        // It is the caller's responsibility to call Close on the WriteCloser when finished writing.

        if err = zw.Close(); err != nil {

            log.Fatal(err)

        }

    }

    ctx, cancelFunc := context.WithTimeout(context.Background(), 100*time.Millisecond)

    client.Set(ctx, fmt.Sprintf("%d", index), base64.StdEncoding.EncodeToString(buf.Bytes()), 1000*time.Hour)


    cancelFunc()

    wg.Done()

}(index)


if index%tenThousand == 0 {

    wg.Wait()

}

2-你需要把wg.Add(1)之前go func(index int) {:


    wg.Add(1)

    go func(index int) {

3-wg.Wait()逻辑:


if index%10000 == 0 {

        fmt.Println(index)

        wg.Wait()

    }

如果 . 最后一次迭代会发生什么index%10000 != 0。所以在这里当err == io.EOF你需要wg.Wait()让所有的 goroutines 加入时:


if err == io.EOF {

    wg.Wait()

    fmt.Println("\n**** All done **** index =", index)

    break

}

4-您可以使用词法范围(静态范围)来限制一些变量范围并使代码更易于管理 - 并知道何时Close:lzw.NewWriter


{ // lexical scoping (static scoping)

    zw := lzw.NewWriter(bufio.NewWriter(&buf), lzw.LSB, 8)

    n, err := io.Copy(zw, strings.NewReader(line))

    if err != nil {

        log.Fatal(err)

    }

    if int(n) != len(line) {

        log.Fatal(n, len(line))

    }

    // It is the caller's responsibility to call Close on the WriteCloser when finished writing.

    if err = zw.Close(); err != nil {

        log.Fatal(err)

    }

}

5- 始终检查错误,例如:


 if err = zw.Close(); err != nil {

    log.Fatal(err)

}

这是接近你的代码的工作版本 - 试试这个只是为了试验并发逻辑看看会发生什么(不推荐因为它有多余的 goroutines 并且io.Pipe- 只是工作:


package main


import (

    "bufio"

    "compress/lzw"

    "context"

    "encoding/base64"

    "fmt"

    "io"

    "log"

    "strings"

    "sync"

    "time"

)


func main() {

    index := 0

    client := &myClient{}

    reader := bufio.NewReader(file)

    // your code:

    var wg sync.WaitGroup

    for {

        index++

        line, err := reader.ReadString('\n')

        if err != nil {

            msg <- fmt.Sprint(index, " Done not waiting with err: ", err, time.Now())

            wg.Wait() // break waiting // if index%tenThousand != 0

            break

        }

        wg.Add(1)

        go func(i int) {

            msg <- fmt.Sprint(i, " Enter running ... ", time.Now())

            asyncReader, asyncWriter := io.Pipe() // make it async to read and write

            zipWriter := lzw.NewWriter(asyncWriter, lzw.LSB, 8)

            go func() { // async

                _, err := io.Copy(zipWriter, strings.NewReader(line))

                if err != nil {

                    log.Fatal(err)

                }

                _ = zipWriter.Close()

                _ = asyncWriter.Close() // for io.ReadAll

            }()

            b, err := io.ReadAll(asyncReader)

            if err != nil {

                log.Fatal(err)

            }

            client.Set(context.Background(), fmt.Sprintf("%d", i), base64.StdEncoding.EncodeToString(b), time.Hour*1000)

            asyncReader.Close()

            time.Sleep(1 * time.Second)

            msg <- fmt.Sprint(i, " Exit running ... ", time.Now())

            wg.Done()

        }(index)


        msg <- fmt.Sprint(index, " ", index%tenThousand == 0, " after go call")

        if index%tenThousand == 0 {

            wg.Wait()

            msg <- fmt.Sprint("..", index, " Done waiting after go call. ", time.Now())

        }

    }

    msg <- "Bye forever."


    wg.Wait()

    close(msg)

    wgMsg.Wait()

}


// just for the Go Playground:

const tenThousand = 2


type myClient struct {

}


func (p *myClient) Set(ctx context.Context, a, b string, t time.Duration) {

    // fmt.Println("a =", a, ", b =", b, ", t =", t)

    if ctx.Err() != nil {

        fmt.Println(ctx.Err())

    }

}


var file, myw = io.Pipe()


func init() {

    go func() {

        for i := 1; i <= tenThousand+1; i++ {

            fmt.Fprintf(myw, "%d text to compress aaaaaaaaaaaaaa\n", i)

        }

        myw.Close()

    }()

    wgMsg.Add(1)

    go func() {

        defer wgMsg.Done()

        for s := range msg {

            fmt.Println(s)

        }

    }()

}


var msg = make(chan string, 100)

var wgMsg sync.WaitGroup


输出:


1 false after go call

2 true after go call

1 Enter running ... 2009-11-10 23:00:00 +0000 UTC m=+0.000000001

2 Enter running ... 2009-11-10 23:00:00 +0000 UTC m=+0.000000001

1 Exit running ... 2009-11-10 23:00:01 +0000 UTC m=+1.000000001

2 Exit running ... 2009-11-10 23:00:01 +0000 UTC m=+1.000000001

..2 Done waiting after go call. 2009-11-10 23:00:01 +0000 UTC m=+1.000000001

3 false after go call

3 Enter running ... 2009-11-10 23:00:01 +0000 UTC m=+1.000000001

4 Done not waiting with err: EOF 2009-11-10 23:00:01 +0000 UTC m=+1.000000001

3 Exit running ... 2009-11-10 23:00:02 +0000 UTC m=+2.000000001

Bye forever.


查看完整回答
反对 回复 2022-12-19
  • 1 回答
  • 0 关注
  • 100 浏览
慕课专栏
更多

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信