为了账号安全,请及时绑定邮箱和手机立即绑定

没有收到来自频道的消息

没有收到来自频道的消息

Go
呼如林 2023-03-29 15:54:24
编辑:在我添加了一小部分我正在使用的文件(7 GB)并尝试运行该程序后,我可以看到:fatal error: all goroutines are asleep - deadlock!goroutine 1 [chan receive]:main.main()    /media/developer/golang/manual/examples/sp/v2/sp.v2.go:71 +0x4a9exit status 2情况:我是 GO 的新手,所以如果我的问题真的很简单,我很抱歉。我正在尝试流式传输xml文件、拆分文档,然后在不同的 GO 例程中解析它们。我正在使用的 XML 文件示例:<?xml version="1.0" encoding="UTF-8"?><osm version="0.6" generator="CGImap 0.0.2">    <relation id="56688" user="kmvar" uid="56190" visible="true" version="28" changeset="6947637" timestamp="2011-01-12T14:23:49Z">        <member type="node" ref="294942404" role=""/>        <member type="node" ref="364933006" role=""/>        <tag k="name" v="Küstenbus Linie 123"/>        <tag k="network" v="VVW"/>        <tag k="route" v="bus"/>        <tag k="type" v="route"/>    </relation>    <relation id="98367" user="jdifh" uid="92834" visible="true" version="28" changeset="6947637" timestamp="2011-01-12T14:23:49Z">        <member type="node" ref="294942404" role=""/>        <member type="way" ref="4579143" role=""/>        <member type="node" ref="249673494" role=""/>        <tag k="name" v="Küstenbus Linie 123"/>        <tag k="network" v="VVW"/>        <tag k="operator" v="Regionalverkehr Küste"/>        <tag k="ref" v="123"/>    </relation>    <relation id="72947" user="schsu" uid="92374" visible="true" version="28" changeset="6947637" timestamp="2011-01-12T14:23:49Z">        <member type="node" ref="294942404" role=""/>        <tag k="name" v="Küstenbus Linie 123"/>        <tag k="type" v="route"/>    </relation></osm>它应该打印每个结构,但我什么也没看到。该程序只是挂起。我测试了 streamer 和 splitter(只是在将消息发送到通道之前添加fmt.Println(rs)到函数parseRelation中)。我可以看到结构。因此,问题在于发送和接收消息。问题:我不知道如何解决这个问题。尝试更改频道中消息的类型(从RS到string)并且每次只发送一个字符串。但它也没有帮助(我什么也看不到)
查看完整描述

2 回答

?
呼唤远方

TA贡献1856条经验 获得超11个赞

首先,让我们解决这个问题:您不能逐行解析 XML。您很幸运,您的文件恰好是每行一个标签,但这不能想当然。您必须解析整个 XML 文档。

通过逐行处理,您试图将<tag>and<member>推入为<relation>. 相反,使用xml.NewDecoder并让它为您处理文件。

package main


import (

    "encoding/xml"

    "fmt"

    "os"

    "log"

)


type Osm struct {

    XMLName     xml.Name    `xml:"osm"`

    Relations   []Relation  `xml:"relation"`

}

type Relation struct {

    XMLName     xml.Name    `xml:"relation"`

    ID          string      `xml:"id,attr"`

    User        string      `xml:"user,attr"`

    Uid         string      `xml:"uid,attr"`

    Members     []Member    `xml:"member"`

    Tags        []Tag       `xml:"tag"`

}

type Member struct {

    XMLName     xml.Name    `xml:"member"`

    Ref         string      `xml:"ref,attr"`

    Type        string      `xml:"type,attr"`

    Role        string      `xml:"role,attr"`

}

type Tag struct {

    XMLName     xml.Name    `xml:"tag"`

    Key         string      `xml:"k,attr"`

    Value       string      `xml:"v,attr"`

}


func main() {

    reader, err := os.Open("test.xml")

    if err != nil {

        log.Fatal(err)

    }

    defer reader.Close()


    decoder := xml.NewDecoder(reader)


    osm := &Osm{}

    err = decoder.Decode(&osm)

    if err != nil {

        log.Fatal(err)

    }

    fmt.Println(osm)

}

Osm其他结构类似于您期望的 XML 模式。decoder.Decode(&osm)应用该架构。

答案的其余部分将仅涵盖通道和 goroutines 的使用。XML 部分将被删除。


如果你添加一些调试语句,你会发现它parseRelation从未被调用,这意味着它channel是空的并且fmt.Println(<- channel)等待一个永远不会关闭的空通道。所以一旦你完成处理,关闭通道。

  for {

    p2Rc, err := reader.ReadSlice('\n')


    ...

  }

  close(channel)

现在我们得到{ [] []}5973974 次。


for i := 0; i < 5973974; i++ {

  fmt.Println(<- channel)

}

这是尝试从通道读取 5973974 次。这违背了渠道的意义。相反,使用range.


for thing := range channel {

    fmt.Println(thing)

}

现在至少它完成了!


但是有一个新问题。如果它真的找到了一个东西,比如如果你改成if p2Rc[2] == 114 {,if p2Rc[2] == 32 {你会得到一个panic: send on closed channel. 这是因为parseRelation与阅读器并行运行,并且可能会在主要阅读代码完成并关闭通道后尝试写入。在关闭频道之前,您必须确保使用该频道的每个人都已完成。


要解决此问题,需要进行相当大的重新设计。


下面是一个简单程序的示例,该程序从文件中读取行,将它们放入通道,并让工作人员从该通道读取。


func main() {

    reader, err := os.Open("test.xml")

    if err != nil {

        log.Fatal(err)

    }

    defer reader.Close()


    // Use the simpler bufio.Scanner

    scanner := bufio.NewScanner(reader)


    // A buffered channel for input

    lines := make(chan string, 10)


    // Work on the lines

    go func() {

        for line := range lines {

            fmt.Println(line)

        }

    }()


    // Read lines into the channel

    for scanner.Scan() {

        lines <- scanner.Text()

    }

    if err := scanner.Err(); err != nil {

        log.Fatal(err)

    }


    // When main exits, channels gracefully close.

}

这很好用,因为main它很特殊并且在退出时会清理通道。但是如果 reader 和 writer 都是 goroutines 呢?


// A buffered channel for input

lines := make(chan string, 10)


// Work on the lines

go func() {

    for line := range lines {

        fmt.Println(line)

    }

}()


// Read lines into the channel

go func() {

    for scanner.Scan() {

        lines <- scanner.Text()

    }

    if err := scanner.Err(); err != nil {

        log.Fatal(err)

    }

}()

空的。main在 goroutines 可以完成他们的工作之前退出并关闭通道。我们需要一种方法让我们知道main要等到处理完成。有几种方法可以做到这一点。一个是与另一个通道同步处理。


// A buffered channel for input

lines := make(chan string, 10)


// A channel for main to wait for

done := make(chan bool, 1)


// Work on the lines

go func() {

    for line := range lines {

        fmt.Println(line)

    }


    // Indicate the worker is done

    done <- true

}()


// Read lines into the channel

go func() {

    // Explicitly close `lines` when we're done so the workers can finish

    defer close(lines)


    for scanner.Scan() {

        lines <- scanner.Text()

    }

    if err := scanner.Err(); err != nil {

        log.Fatal(err)

    }

}()


// main will wait until there's something to read from `done`

<-done

现在main将触发 reader 和 worker goroutines 并缓冲等待done. 阅读器将填充lines直到完成阅读,然后将其关闭。同时,工作人员将在完成读取后读取lines和写入。done


另一种选择是使用sync.WaitGroup。


// A buffered channel for input

lines := make(chan string, 10)


var wg sync.WaitGroup


// Note that there is one more thing to wait for

wg.Add(1)

go func() {

    // Tell the WaitGroup we're done

    defer wg.Done()


    for line := range lines {

        fmt.Println(line)

    }

}()


// Read lines into the channel

go func() {

    defer close(lines)


    for scanner.Scan() {

        lines <- scanner.Text()

    }

    if err := scanner.Err(); err != nil {

        log.Fatal(err)

    }

}()


// Wait until everything in the WaitGroup is done

wg.Wait()

和以前一样,main启动 reader 和 worker goroutine,但现在它在启动 worker 之前将 1 添加到 WaitGroup。然后等待wg.Wait()返回。阅读器与以前一样工作,lines完成后关闭通道。工作人员现在wg.Done()在完成递减 WaitGroup 的计数并允许wg.Wait()返回时调用。


每种技术都有优点和缺点。done更灵活,链条更好,如果你能把头绕在它身上会更安全。WaitGroups 更简单,更容易理解,但要求每个 goroutine 共享一个变量。


如果我们想添加到这个处理链中,我们可以这样做。假设我们有一个读取行的 goroutine,一个在 XML 元素中处理它们,一个对元素执行某些操作。


// A buffered channel for input

lines := make(chan []byte, 10)

elements := make(chan *RS)


var wg sync.WaitGroup


// Worker goroutine, turn lines into RS structs

wg.Add(1)

go func() {

    defer wg.Done()

    defer close(elements)


    for line := range lines {

        if line[2] == 32 {

            fmt.Println("Begin")

            fmt.Println(string(line))

            fmt.Println("End")


            rs := &RS{}

            xml.Unmarshal(line, &rs)

            elements <- rs

        }

    }

}()


// File reader

go func() {

    defer close(lines)


    for scanner.Scan() {

        lines <- scanner.Bytes()

    }

    if err := scanner.Err(); err != nil {

        log.Fatal(err)

    }

}()


// Element reader

wg.Add(1)

go func() {

    defer wg.Done()


    for element := range elements {

        fmt.Println(element)

    }

}()


wg.Wait()

这会产生空结构,因为您正试图将单独的 XML 行放入表示完整标记的结构中<relationship>。但它演示了如何向链中添加更多工人。


查看完整回答
反对 回复 2023-03-29
?
芜湖不芜

TA贡献1796条经验 获得超7个赞

我不知道这是否对您有帮助,但您的条件if p2Rc[2]==114永远不会满足,然后您继续并开始收听频道。从不接收输入。

我认为这里的主要问题是这个(代码)是做什么的?上述条件属于该过程的何处?如果这很清楚,我可以更新一个更好的回应。


查看完整回答
反对 回复 2023-03-29
  • 2 回答
  • 0 关注
  • 98 浏览
慕课专栏
更多

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信