2 回答
TA贡献1900条经验 获得超5个赞
使用以下内容将字符串通道转换为读取器:
type chanReader struct {
c chan string
buf string
}
func (r *chanReader) Read(p []byte) (int, error) {
// Fill the buffer when we have no data to return to the caller
if len(r.buf) == 0 {
var ok bool
r.buf, ok = <-r.c
if !ok {
// Return eof on channel closed
return 0, io.EOF
}
}
n := copy(p, r.buf)
r.buf = r.buf[n:]
return n, nil
}
像这样使用它:
r := csv.NewReader(&chanReader{c: feederChan})
for {
a, err := r.Read()
if err != nil {
// handle error, break out of loop
}
// do something with a
}
如果应用程序假定换行符分隔从通道接收的值,则将换行符附加到每个接收到的值:
...
var ok bool
r.buf, ok = <-r.c
if !ok {
// Return eof on channel closed
return 0, io.EOF
}
r.buf += "\n"
...
复制+= "\n"字符串。如果这不能满足应用程序的效率要求,则引入一个新字段来管理行分隔符。
type chanReader struct {
c chan string // source of lines
buf string // the current line
nl bool // true if line separator is pending
}
func (r *chanReader) Read(p []byte) (int, error) {
// Fill the buffer when we have no data to return to the caller
if len(r.buf) == 0 && !r.nl {
var ok bool
r.buf, ok = <-r.c
if !ok {
// Return eof on channel closed
return 0, io.EOF
}
r.nl = true
}
// Return data if we have it
if len(r.buf) > 0 {
n := copy(p, r.buf)
r.buf = r.buf[n:]
return n, nil
}
// No data, return the line separator
n := copy(p, "\n")
r.nl = n == 0
return n, nil
}
另一种方法是按照问题评论中的建议,使用 io.Pipe 和 goroutine 将通道转换为 io.Reader。这种方法的第一步是:
var nl = []byte("\n")
func createChanReader(c chan string) io.Reader {
r, w := io.Pipe()
go func() {
defer w.Close()
for s := range c {
io.WriteString(w, s)
w.Write(nl)
}
}
}()
return r
}
像这样使用它:
r := csv.NewReader(createChanReader(feederChan))
for {
a, err := r.Read()
if err != nil {
// handle error, break out of loop
}
// do something with a
}
当应用程序在将管道读取到 EOF 之前退出循环时, io.Pipe 解决方案的第一遍会泄漏 goroutine。应用程序可能会提前中断,因为 CSV 阅读器检测到语法错误,应用程序由于程序员错误或任何其他原因而崩溃。
要修复 goroutine 泄漏,请在写入错误时退出写入 goroutine,并在完成读取后关闭管道读取器。
var nl = []byte("\n")
func createChanReader(c chan string) *io.PipeReader {
r, w := io.Pipe()
go func() {
defer w.Close()
for s := range c {
if _, err := io.WriteString(w, s); err != nil {
return
}
if _, err := w.Write(nl); err != nil {
return
}
}
}()
return r
}
像这样使用它:
cr := createChanReader(feederChan)
defer cr.Close() // Required for goroutine cleanup
r := csv.NewReader(cr)
for {
a, err := r.Read()
if err != nil {
// handle error, break out of loop
}
// do something with a
}
TA贡献1825条经验 获得超4个赞
我最终还是使用了 io.Pipe() “正如 mh-cbon 提到的那样”,它更简单并且看起来更有效(如下所述):
rp, wp := io.Pipe()
go func() {
defer wp.Close()
for i := range feederChan {
fmt.Fprintln(wp, i)
}
}()
r := csv.NewReader(rp)
for { // keep reading
a, err := r.Read()
if err == io.EOF {
break
}
// do stuff with 'a'
// ...
}
io.Pipe() 是同步的,并且应该相当高效:它将数据从写入器通过管道传输到读取器;我将 csv.NewReader() 提供给读者部分,并创建了一个 goroutine,将 chan 写入到作者部分。
多谢。
- 2 回答
- 0 关注
- 91 浏览
添加回答
举报