为了账号安全,请及时绑定邮箱和手机立即绑定

Golang并发写入多个文件

Golang并发写入多个文件

Go
MMTTMM 2023-02-21 13:17:10
我需要同时写入多个文件,但是我有这种奇怪的行为,当执行太多并发 go 例程时,写入时间会增加。我正在使用此代码示例重现此行为:package mainimport (    "fmt"    "log"    "time"    "math/rand"    "sync"    "os"    "io/ioutil")var wg sync.WaitGroupvar mu sync.Mutexfunc WriteToFile(data []byte, fileName string) error {    mu.Lock()    err := ioutil.WriteFile(fileName, data, 0666)    if err != nil {        return err    }    return nil}func GenerateFile(index int) error {    defer wg.Done()    start := time.Now()    elapsed := time.Since(start)    buf := make([]byte, 7500000)    rand.Read(buf) // generate random data    randomFileName := fmt.Sprintf("/tmp/gotest-%v", rand.Int())    err := WriteToFile(buf, randomFileName)    if err != nil {        return err    }    defer os.Remove(randomFileName)    elapsed = time.Since(start)    log.Printf("goroutine id: %v, generate file %s done in %s", index, randomFileName, elapsed)    return nil}func main() {    start := time.Now()    for i := 0; i < 30; i++ {        wg.Add(1)        go GenerateFile(i)    }    wg.Wait()    elapsed := time.Since(start)    log.Printf("done in %s", elapsed)}我希望所有请求应该大致同时完成,因为我正在启动 goroutine,并且会比第一个 goroutine 花费更多的时间。如果我删除数据生成和文件写入部分,请求会同时返回。我也对工人池进行了实验,但如果我添加太多工人,总时间仍然会增加。我不明白这种行为。有人可以给我一个解释吗?
查看完整描述

1 回答

?
慕桂英546537

TA贡献1848条经验 获得超10个赞

实际上,Go 确实是并行写入磁盘的。问题中顺序行为的原因是math/rand。此包使用通过内部互斥锁实现的线程安全Read随机数生成器:请参阅和globalRndlockedSource

这就是为什么你的GenerateFilegoroutines 几乎严格地一个接一个地运行——它们在rand.globalRnd.lk互斥锁上进行内部同步。

有两种方法可以提高性能。一种是在每个线程中使用独立的PRNG,另一种是预先生成写入数据。

这是一个尝试所有变体的示例程序。

package main


import (

    "fmt"

    "io/ioutil"

    "log"

    "math/rand"

    "os"

    "sort"

    "sync"

    "time"

)


var wg sync.WaitGroup


const N = 30


var elapsed_g [N]time.Duration


func SortAndLogElapsed(prefix string) {

    sort.Slice(elapsed_g[:], func(i, j int) bool { return elapsed_g[i].Nanoseconds() < int64(elapsed_g[j].Nanoseconds()) })

    for _, elapsed := range elapsed_g {

        fmt.Println(prefix, elapsed)

    }

}


func GenerateFile(start time.Time, id int) error {

    defer wg.Done()

    elapsed := time.Since(start)

    buf := make([]byte, 7500000)

    rand.Read(buf) // generate random data

    randomFileName := fmt.Sprintf("/tmp/gotest-%v", rand.Int())

    err := ioutil.WriteFile(randomFileName, buf, 0666)

    if err != nil {

        return err

    }

    defer os.Remove(randomFileName)

    elapsed = time.Since(start)

    // log.Printf("generate file %s done in %s", randomFileName, elapsed)

    elapsed_g[id] = elapsed

    return nil

}


func RunWithCommonPrng() {

    start := time.Now()

    for i := 0; i < N; i++ {

        wg.Add(1)

        go GenerateFile(start, i)

    }

    wg.Wait()

    elapsed := time.Since(start)

    SortAndLogElapsed("common PRNG: ")

    log.Printf("done in %s", elapsed)

}


func GenerateFilePrivatePrng(id int, prng rand.Source, start time.Time) error {

    defer wg.Done()

    elapsed := time.Since(start)

    buf := make([]byte, 7500000)

    rand.New(prng).Read(buf) // generate random data

    randomFileName := fmt.Sprintf("/tmp/gotest-%v", prng.Int63())

    err := ioutil.WriteFile(randomFileName, buf, 0666)

    if err != nil {

        return err

    }

    defer os.Remove(randomFileName)

    elapsed = time.Since(start)

    elapsed_g[id] = elapsed

    // log.Printf("generate file %s with private source: done in %s", randomFileName, elapsed)

    return nil

}


func RunWithPrivatePrng() {

    start := time.Now()

    for i := 0; i < N; i++ {

        wg.Add(1)

        go GenerateFilePrivatePrng(i, rand.NewSource(int64(i)), start)

    }

    wg.Wait()

    elapsed := time.Since(start)

    SortAndLogElapsed("Private PRNG: ")

    log.Printf("done in %s", elapsed)

}


func GenerateFileWithGivenData(id int, buf []byte, start time.Time) error {

    defer wg.Done()

    randomFileName := fmt.Sprintf("/tmp/gotest-%v", rand.Int())

    err := ioutil.WriteFile(randomFileName, buf, 0666)

    if err != nil {

        return err

    }

    defer os.Remove(randomFileName)

    elapsed := time.Since(start)

    elapsed_g[id] = elapsed

    // log.Printf("generate file %s with data: done in %s", randomFileName, elapsed)

    return nil

}


func RunWithCommonData() {

    buf := make([]byte, 7500000)

    rand.Read(buf) // generate random data


    start := time.Now()

    for i := 0; i < N; i++ {

        wg.Add(1)

        go GenerateFileWithGivenData(i, buf, start)

    }

    wg.Wait()

    elapsed := time.Since(start)

    SortAndLogElapsed("Common data: ")

    log.Printf("done in %s", elapsed)

}


func main() {

    log.Printf("Used CPUs / Max CPUs: %d/%d", runtime.GOMAXPROCS(0), runtime.NumCPU())


    RunWithCommonPrng()

    RunWithPrivatePrng()

    RunWithCommonData()

}

在 8 核 CPU 和 SSD 上的输出是这样的:


2022/10/02 00:00:08 Used CPUs / Max CPUs: 8/8

common PRNG:  9.943335ms

common PRNG:  15.12122ms

common PRNG:  20.856216ms

common PRNG:  26.636462ms

common PRNG:  32.041066ms

common PRNG:  37.450744ms

common PRNG:  43.286644ms

common PRNG:  48.695199ms

common PRNG:  54.518533ms

common PRNG:  59.858065ms

common PRNG:  65.620084ms

common PRNG:  71.111171ms

common PRNG:  76.388583ms

common PRNG:  81.609326ms

common PRNG:  87.465878ms

common PRNG:  92.623557ms

common PRNG:  98.35468ms

common PRNG:  103.606529ms

common PRNG:  109.28623ms

common PRNG:  114.981873ms

common PRNG:  120.26626ms

common PRNG:  125.530811ms

common PRNG:  131.222195ms

common PRNG:  136.399946ms

common PRNG:  142.305635ms

common PRNG:  147.687525ms

common PRNG:  153.002392ms

common PRNG:  158.769948ms

common PRNG:  164.241503ms

common PRNG:  169.531355ms

2022/10/02 00:00:08 done in 170.273377ms

Private PRNG:  16.255543ms

Private PRNG:  17.155624ms

Private PRNG:  17.477437ms

Private PRNG:  17.49527ms

Private PRNG:  17.521759ms

Private PRNG:  18.363554ms

Private PRNG:  19.800906ms

Private PRNG:  30.340522ms

Private PRNG:  31.551496ms

Private PRNG:  40.583626ms

Private PRNG:  54.682705ms

Private PRNG:  54.832006ms

Private PRNG:  54.983126ms

Private PRNG:  55.143073ms

Private PRNG:  56.517272ms

Private PRNG:  56.577967ms

Private PRNG:  57.718ms

Private PRNG:  58.770033ms

Private PRNG:  59.246808ms

Private PRNG:  59.608246ms

Private PRNG:  59.789123ms

Private PRNG:  60.028814ms

Private PRNG:  68.533662ms

Private PRNG:  69.606317ms

Private PRNG:  69.837988ms

Private PRNG:  71.488161ms

Private PRNG:  71.770842ms

Private PRNG:  72.036881ms

Private PRNG:  72.23509ms

Private PRNG:  73.037337ms

2022/10/02 00:00:08 done in 73.694825ms

Common data:  5.220506ms

Common data:  5.220523ms

Common data:  5.220524ms

Common data:  5.220526ms

Common data:  5.221125ms

Common data:  5.221169ms

Common data:  5.222472ms

Common data:  6.977304ms

Common data:  13.601358ms

Common data:  13.614532ms

Common data:  13.859067ms

Common data:  14.75378ms

Common data:  16.00253ms

Common data:  16.111086ms

Common data:  16.263291ms

Common data:  16.42076ms

Common data:  17.024946ms

Common data:  17.313631ms

Common data:  17.749351ms

Common data:  18.18497ms

Common data:  18.83511ms

Common data:  21.789867ms

Common data:  22.308659ms

Common data:  22.308701ms

Common data:  22.546815ms

Common data:  23.298865ms

Common data:  23.482138ms

Common data:  23.610855ms

Common data:  23.667347ms

Common data:  24.500486ms

2022/10/02 00:00:08 done in 25.205652ms

“公共数据”用于预生成的缓冲区。它表明 Golang 确实并行写入磁盘。它在线程之间分配 goroutines,这些 goroutines 占用 CPU 核心,直到 I/O 完成。


更新


这是打印 Linux 线程 ID 和 CPU 编号的代码。


package main


/*

#define _GNU_SOURCE

#include <sched.h>

*/

import "C"


import (

    "fmt"

    "io/ioutil"

    "log"

    "math/rand"

    "os"

    "runtime"

    "sort"

    "sync"

    "syscall"

    "time"


    "github.com/pkg/profile"

)


func GetCpu() int {

    var ret C.int = C.sched_getcpu()

    return int(ret)

}


func GetThreadId() int {

    return syscall.Gettid()

}


var wg sync.WaitGroup


const N = 30


var elapsed_g [N]time.Duration


func SortAndLogElapsed(prefix string) {

    sort.Slice(elapsed_g[:], func(i, j int) bool { return elapsed_g[i].Nanoseconds() < int64(elapsed_g[j].Nanoseconds()) })

    for _, elapsed := range elapsed_g {

        fmt.Println(prefix, elapsed)

    }

}


func GenerateFileWithGivenData(id int, buf []byte, start time.Time) error {

    defer wg.Done()

    randomFileName := fmt.Sprintf("/tmp/gotest-%v", rand.Int())

    tid := GetThreadId()

    cpu := GetCpu()


    before := time.Now()

    fmt.Printf("Before WriteFile:\t----\t%d\t%d\t%d\t%s\n", id, tid, cpu, before.String())

    err := ioutil.WriteFile(randomFileName, buf, 0666)

    after := time.Now()

    tid = GetThreadId()

    cpu = GetCpu()

    fmt.Printf("After WriteFile:\t%d\t%d\t%d\t%d\t%s\n", after.Sub(before).Microseconds(), id, tid, cpu, after.String())

    if err != nil {

        return err

    }

    defer os.Remove(randomFileName)

    elapsed := time.Since(start)

    elapsed_g[id] = elapsed

    // log.Printf("generate file %s with data: done in %s", randomFileName, elapsed)

    return nil

}


func RunWithCommonData() {

    buf := make([]byte, 7500000)

    rand.Read(buf) // generate random data

    fmt.Printf("                \tElapsed\tG\tTID\tCPU\ttime\n")


    start := time.Now()

    println("")

    for i := 0; i < N; i++ {

        wg.Add(1)

        go GenerateFileWithGivenData(i, buf, start)

    }

    wg.Wait()

    elapsed := time.Since(start)

    SortAndLogElapsed("Common data: ")

    log.Printf("done in %s", elapsed)

}


func main() {


    log.Printf("Used CPUs / Max CPUs: %d/%d", runtime.GOMAXPROCS(0), runtime.NumCPU())


    // RunWithCommonPrng()

    // RunWithPrivatePrng()

    defer profile.Start(profile.CPUProfile).Stop()

    RunWithCommonData()

}

我系统的输出是(G internal goroutine ID, TID - Linux thread id, CPU - CPU number, last column is elapsed time)按时间排序:


                  Elapsed    G  TID     CPU     time

Before WriteFile:   ----    29  23379   0   2022-10-03 20:24:47.35247545 +0900 KST m=+0.006016977

Before WriteFile:   ----    0   23380   1   2022-10-03 20:24:47.352475589 +0900 KST m=+0.006017128

Before WriteFile:   ----    14  23383   7   2022-10-03 20:24:47.352506383 +0900 KST m=+0.006047950

Before WriteFile:   ----    7   23381   2   2022-10-03 20:24:47.352572666 +0900 KST m=+0.006114235

Before WriteFile:   ----    10  23377   6   2022-10-03 20:24:47.352634156 +0900 KST m=+0.006175692

Before WriteFile:   ----    8   23384   4   2022-10-03 20:24:47.352727575 +0900 KST m=+0.006269119

Before WriteFile:   ----    9   23385   5   2022-10-03 20:24:47.352766795 +0900 KST m=+0.006308348

After WriteFile:    4133    14  23383   7   2022-10-03 20:24:47.356640341 +0900 KST m=+0.010181880

After WriteFile:    4952    7   23381   2   2022-10-03 20:24:47.357525386 +0900 KST m=+0.011066917

After WriteFile:    5049    29  23379   0   2022-10-03 20:24:47.357525403 +0900 KST m=+0.011066934

After WriteFile:    4758    9   23385   5   2022-10-03 20:24:47.3575254 +0900 KST m=+0.011066928

After WriteFile:    4892    10  23377   6   2022-10-03 20:24:47.357526773 +0900 KST m=+0.011068303

After WriteFile:    5051    0   23380   1   2022-10-03 20:24:47.35752678 +0900 KST m=+0.011068311

After WriteFile:    4801    8   23384   4   2022-10-03 20:24:47.357529101 +0900 KST m=+0.011070629

Before WriteFile:   ----    12  23380   1   2022-10-03 20:24:47.357554923 +0900 KST m=+0.011096462

Before WriteFile:   ----    13  23377   6   2022-10-03 20:24:47.357555161 +0900 KST m=+0.011096695

Before WriteFile:   ----    1   23381   2   2022-10-03 20:24:47.357555163 +0900 KST m=+0.011096697

Before WriteFile:   ----    2   23381   2   2022-10-03 20:24:47.35756292 +0900 KST m=+0.011104452

Before WriteFile:   ----    11  23377   6   2022-10-03 20:24:47.3575642 +0900 KST m=+0.011105730

Before WriteFile:   ----    21  23385   5   2022-10-03 20:24:47.357570038 +0900 KST m=+0.011111568

Before WriteFile:   ----    25  23383   7   2022-10-03 20:24:47.357572217 +0900 KST m=+0.011113747

Before WriteFile:   ----    26  23379   0   2022-10-03 20:24:47.358768915 +0900 KST m=+0.012310542

Before WriteFile:   ----    27  23384   4   2022-10-03 20:24:47.361560776 +0900 KST m=+0.015102306

After WriteFile:    4020    25  23383   7   2022-10-03 20:24:47.361593063 +0900 KST m=+0.015134592

After WriteFile:    4873    12  23380   1   2022-10-03 20:24:47.362428015 +0900 KST m=+0.015969540

After WriteFile:    4858    21  23385   5   2022-10-03 20:24:47.362428103 +0900 KST m=+0.015969632

After WriteFile:    4865    2   23381   2   2022-10-03 20:24:47.362428238 +0900 KST m=+0.015969769

After WriteFile:    4864    11  23377   6   2022-10-03 20:24:47.362428347 +0900 KST m=+0.015969877

Before WriteFile:   ----    15  23385   5   2022-10-03 20:24:47.362454039 +0900 KST m=+0.015995570

Before WriteFile:   ----    28  23380   1   2022-10-03 20:24:47.362454041 +0900 KST m=+0.015995573

Before WriteFile:   ----    23  23377   6   2022-10-03 20:24:47.362454121 +0900 KST m=+0.015995651

Before WriteFile:   ----    16  23385   5   2022-10-03 20:24:47.362462845 +0900 KST m=+0.016004374

Before WriteFile:   ----    22  23377   6   2022-10-03 20:24:47.362479715 +0900 KST m=+0.016021242

After WriteFile:    4902    26  23379   0   2022-10-03 20:24:47.363671623 +0900 KST m=+0.017213150

Before WriteFile:   ----    18  23386   6   2022-10-03 20:24:47.365182522 +0900 KST m=+0.018724057

After WriteFile:    8764    13  23383   7   2022-10-03 20:24:47.366320071 +0900 KST m=+0.019861611

Before WriteFile:   ----    17  23379   0   2022-10-03 20:24:47.366374805 +0900 KST m=+0.019916338

After WriteFile:    4902    27  23384   4   2022-10-03 20:24:47.366463028 +0900 KST m=+0.020004556

After WriteFile:    4729    28  23380   1   2022-10-03 20:24:47.367183315 +0900 KST m=+0.020724852

After WriteFile:    4720    16  23385   5   2022-10-03 20:24:47.367183317 +0900 KST m=+0.020724850

Before WriteFile:   ----    19  23385   5   2022-10-03 20:24:47.367230069 +0900 KST m=+0.020771602

Before WriteFile:   ----    20  23384   4   2022-10-03 20:24:47.367748633 +0900 KST m=+0.021290163

Before WriteFile:   ----    3   23391   3   2022-10-03 20:24:47.368046383 +0900 KST m=+0.021587923

Before WriteFile:   ----    5   23388   1   2022-10-03 20:24:47.36857915 +0900 KST m=+0.022120682

Before WriteFile:   ----    4   23380   1   2022-10-03 20:24:47.368590097 +0900 KST m=+0.022131628

Before WriteFile:   ----    6   23393   2   2022-10-03 20:24:47.370493582 +0900 KST m=+0.024035118

After WriteFile:    10260   22  23377   6   2022-10-03 20:24:47.372740578 +0900 KST m=+0.026282112

After WriteFile:    5326    20  23384   4   2022-10-03 20:24:47.37307519 +0900 KST m=+0.026616720

After WriteFile:    10922   23  23387   0   2022-10-03 20:24:47.373376163 +0900 KST m=+0.026917695

After WriteFile:    5613    3   23391   3   2022-10-03 20:24:47.373660058 +0900 KST m=+0.027201605

After WriteFile:    5332    4   23380   1   2022-10-03 20:24:47.373922339 +0900 KST m=+0.027463865

After WriteFile:    8871    18  23377   6   2022-10-03 20:24:47.374053982 +0900 KST m=+0.027595513

After WriteFile:    7880    17  23384   4   2022-10-03 20:24:47.374255159 +0900 KST m=+0.027796694

After WriteFile:    12127   15  23387   0   2022-10-03 20:24:47.37458126 +0900 KST m=+0.028122790

After WriteFile:    7422    19  23391   3   2022-10-03 20:24:47.374652483 +0900 KST m=+0.028194020

Before WriteFile:   ----    24  23377   6   2022-10-03 20:24:47.375338247 +0900 KST m=+0.028879777

After WriteFile:    5111    6   23393   2   2022-10-03 20:24:47.375605341 +0900 KST m=+0.029146871

After WriteFile:    19459   1   23392   5   2022-10-03 20:24:47.377014458 +0900 KST m=+0.030555986

After WriteFile:    3847    24  23377   6   2022-10-03 20:24:47.379185393 +0900 KST m=+0.032726920

After WriteFile:    10778   5   23388   0   2022-10-03 20:24:47.379358058 +0900 KST m=+0.032899584

它表明 goroutines 在我的 8 核 CPU 的所有内核上的许多不同线程中运行。看起来最快的 IO 是在那些保留了线程和 CPU 的 goroutine 中。而且似乎停放/取消停放线程会使阻塞 IO 变慢。


我用 100 个 goroutines 运行相同的代码。最坏的情况有 60 毫秒那么大,但它不是最后一个,中间的一个。即使在最后,也有 5.5 毫秒的快速写入。


查看完整回答
反对 回复 2023-02-21
  • 1 回答
  • 0 关注
  • 291 浏览
慕课专栏
更多

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信