为了账号安全,请及时绑定邮箱和手机立即绑定

将 csv.Reader() 用于“chan 字符串”的有效方法

将 csv.Reader() 用于“chan 字符串”的有效方法

Go
胡子哥哥 2023-04-04 16:58:24
我有一个“chan string”,其中每个条目都是一个 CSV 日志行,我想将其转换为列“[]string”,目前我正在(效率低下)创建一个 csv.NewReader(strings.NewReader(i) ) 对于每个项目,这看起来比实际需要的工作要多得多:for i := range feederChan {    r := csv.NewReader(strings.NewReader(i))    a, err := r.Read()    if err != nil {         // log error...         continue    }    // then do stuff with 'a'    // ...}所以,如果有更有效的方法来做到这一点,我真的很感激分享,比如创建 csv.Reader 一次,然后以某种方式向它提供 chan 内容(将“chan”内容流式传输到实现“io.Reader”接口的东西?)。
查看完整描述

2 回答

?
梵蒂冈之花

TA贡献1900条经验 获得超5个赞

使用以下内容将字符串通道转换为读取器:


type chanReader struct {

    c   chan string

    buf string

}


func (r *chanReader) Read(p []byte) (int, error) {


    // Fill the buffer when we have no data to return to the caller

    if len(r.buf) == 0 {

        var ok bool

        r.buf, ok = <-r.c

        if !ok {

            // Return eof on channel closed

            return 0, io.EOF

        }

    }


    n := copy(p, r.buf)

    r.buf = r.buf[n:]

    return n, nil

}

像这样使用它:


r := csv.NewReader(&chanReader{c: feederChan})

for {

    a, err := r.Read()

    if err != nil {

        // handle error, break out of loop

    }

    // do something with a

}

如果应用程序假定换行符分隔从通道接收的值,则将换行符附加到每个接收到的值:


        ...

        var ok bool

        r.buf, ok = <-r.c

        if !ok {

            // Return eof on channel closed

            return 0, io.EOF

        }

        r.buf += "\n"

        ...

复制+= "\n"字符串。如果这不能满足应用程序的效率要求,则引入一个新字段来管理行分隔符。


type chanReader struct {

    c chan string  // source of lines

    buf string     // the current line

    nl bool        // true if line separator is pending

}


func (r *chanReader) Read(p []byte) (int, error) {


    // Fill the buffer when we have no data to return to the caller

    if len(r.buf) == 0 && !r.nl {

        var ok bool

        r.buf, ok = <-r.c

        if !ok {

            // Return eof on channel closed

            return 0, io.EOF

        }

        r.nl = true

    }


    // Return data if we have it

    if len(r.buf) > 0 {

        n := copy(p, r.buf)

        r.buf = r.buf[n:]

        return n, nil

    }


    // No data, return the line separator

    n := copy(p, "\n")

    r.nl = n == 0

    return n, nil

}

另一种方法是按照问题评论中的建议,使用 io.Pipe 和 goroutine 将通道转换为 io.Reader。这种方法的第一步是:


var nl = []byte("\n")


func createChanReader(c chan string) io.Reader {

    r, w := io.Pipe()

    go func() {

        defer w.Close()

        for s := range c {

            io.WriteString(w, s)

            w.Write(nl)

            }

        }

    }()

    return r

}

像这样使用它:


r := csv.NewReader(createChanReader(feederChan))

for {

    a, err := r.Read()

    if err != nil {

        // handle error, break out of loop

    }

    // do something with a

}

当应用程序在将管道读取到 EOF 之前退出循环时, io.Pipe 解决方案的第一遍会泄漏 goroutine。应用程序可能会提前中断,因为 CSV 阅读器检测到语法错误,应用程序由于程序员错误或任何其他原因而崩溃。


要修复 goroutine 泄漏,请在写入错误时退出写入 goroutine,并在完成读取后关闭管道读取器。


var nl = []byte("\n")


func createChanReader(c chan string) *io.PipeReader {

    r, w := io.Pipe()

    go func() {

        defer w.Close()

        for s := range c {

            if _, err := io.WriteString(w, s); err != nil {

                return

            }

            if _, err := w.Write(nl); err != nil {

                return

            }

        }

    }()

    return r

}

像这样使用它:


cr := createChanReader(feederChan)

defer cr.Close() // Required for goroutine cleanup

r := csv.NewReader(cr)

for {

    a, err := r.Read()

    if err != nil {

        // handle error, break out of loop

    }

    // do something with a

}


查看完整回答
反对 回复 2023-04-04
?
凤凰求蛊

TA贡献1825条经验 获得超4个赞

我最终还是使用了 io.Pipe() “正如 mh-cbon 提到的那样”,它更简单并且看起来更有效(如下所述):


rp, wp := io.Pipe()

go func() {

    defer wp.Close()

    for i := range feederChan {

        fmt.Fprintln(wp, i)

    }

}()


r := csv.NewReader(rp)

for { // keep reading

    a, err := r.Read()

    if err == io.EOF {

        break

    }

    // do stuff with 'a'

    // ...

}

io.Pipe() 是同步的,并且应该相当高效:它将数据从写入器通过管道传输到读取器;我将 csv.NewReader() 提供给读者部分,并创建了一个 goroutine,将 chan 写入到作者部分。


多谢。


查看完整回答
反对 回复 2023-04-04
  • 2 回答
  • 0 关注
  • 91 浏览
慕课专栏
更多

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信