为了账号安全,请及时绑定邮箱和手机立即绑定

使用通道转到管道

使用通道转到管道

Go
侃侃无极 2021-11-22 19:52:24
我正在探索 Go 并尝试使用通道建立一种管道。我只想在 main() 中读取一些内容并将它们发送到 process() 进行处理,在这种情况下只需将值打印到屏幕上。不幸的是,在下面的代码中, process() 似乎从不从通道读取,或者至少它不打印任何内容;我究竟做错了什么?package mainimport ( "fmt" ; "database/sql" ; _ "github.com/lib/pq" ; "time" ; "gopkg.in/redis.v3" )//; "strconv" )type Record struct {    userId, myDate int    prodUrl string}func main(){    //connect to db    db, err := sql.Open(...)    defer db.Close()    //error check here...    //exec query    rows, err := db.Query("select userID,url,date from mytable limit 10")    defer rows.Close()    //error check here...       //create channel to buffer rows read    bufferChan := make(chan *Record,1000)    go process(bufferChan)    //iterate through results and send them to process()    row := new(Record)    for rows.Next(){        err := rows.Scan(&row.userId, &row.prodUrl, &row.myDate)                bufferChan <- row        fmt.Printf("row sent %v",row.userId)                        }   }//prints Record valuesfunc process (buffer chan *Record) {    row := <- buffer    fmt.Printf("row received: %d %v %d ", row.userId,row.prodUrl,row.myDate)}
查看完整描述

3 回答

?
手掌心

TA贡献1942条经验 获得超3个赞

func 进程不打印任何内容的原因是您 func main 在行的 for 循环后退出。Next 完成从而退出程序。你需要做几件事。

  1. 在 for 循环之后添加对 close 的调用以指示结束向缓冲通道添加消息,否则可能导致死锁。所以调用 close(bufferChan)

  2. 使用 range 在您的 func 进程中迭代通道。

  3. 将额外的通道传递给进程以了解它何时完成,以便 main 可以等到进程完成。

看看下面的代码片段,例如:

package main


import "fmt"


func main() {

    bufferChan := make(chan int, 1000)

    done := make(chan bool)

    go process(bufferChan, done)

    for i := 0; i < 100; i++ {

        bufferChan <- i

    }

    close(bufferChan)


    select {

    case <-done:

        fmt.Println("Done")

    }


}


func process(c chan int, done chan bool) {

    for s := range c {

        fmt.Println(s)

    }   

    done <- true


}


查看完整回答
反对 回复 2021-11-22
?
qq_遁去的一_1

TA贡献1725条经验 获得超7个赞

您的 main 函数退出,因此整个程序结束。它应该等待处理结束。此外,进程函数应该使用 range 关键字在通道上循环。


工作解决方案的脚手架如下所示:


package main


import "fmt"


func process(input chan int, done chan struct{}) {

    for i := range input {

        fmt.Println(i)

    }

    done <- struct{}{}

}


func main() {

    input := make(chan int)

    done := make(chan struct{})


    go process(input, done)


    for i := 1; i < 10; i++ {

        input <- i

    }

    close(input)


    <-done

}


查看完整回答
反对 回复 2021-11-22
?
汪汪一只猫

TA贡献1898条经验 获得超8个赞

我相信您正在寻找io.pipe()go API,它在写入器和读取器之间创建同步内存管道。这里没有缓冲。它可用于将需要 的代码io.Reader与需要 的代码连接起来io.Writer。


在您的情况下,io.PipeWriter代码是“从数据库中读取值”,而“io.PipeReader”是“将值写入屏幕”的代码。


这里是一个没有任何缓冲区的流数据示例,即bytes.Buffer.


// Set up the pipe to write data directly into the Reader.

pr, pw := io.Pipe()

// Write JSON-encoded data to the Writer end of the pipe.

// Write in a separate concurrent goroutine, and remember

// to Close the PipeWriter, to signal to the paired PipeReader

// that we’re done writing.

go func() {

  err := json.NewEncoder(pw).Encode(&v)

  pw.Close()

}()

// Send the HTTP request. Whatever is read from the Reader

// will be sent in the request body.

// As data is written to the Writer, it will be available

// to read from the Reader.

resp, err := http.Post(“example.com”, “application/json”, pr)

参考:


https://medium.com/stupid-gopher-tricks/streaming-data-in-go-without-buffering-3285ddd2a1e5


查看完整回答
反对 回复 2021-11-22
  • 3 回答
  • 0 关注
  • 127 浏览
慕课专栏
更多

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信