还记得前半年去某条面试,面试小哥问我,当一个进程启动多个goroutine时,某个goroutine挂了,如何让主进程知道,当时大概知道可以通过context这个包来实现,但是当时没有具体去了解和熟悉这块,在这里再总结下。

子goroutine与主线程同步的集中方式:

  • channel: 每个goroutine往主进程的chan写数据,然后由主进程去读取,直到读取完了全部goroutine的chan就算运行完毕,此时主进程即可正常退出。这种方式是子线程通知主线程结束.
  • context: 使用context中的cancel,这种模式是主线程通知子线程结束
  • sync.WaitGroup: 通过Add方法设置等待子goroutine的数量,使用Done方法设置等待子goroutine的数量减1,当等待数量为0时,Wait函数退出.

1.通过channel实现同步

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
$ cat channel-sync.go
package main
import (
    "fmt"
)
// 通过向主进程中的channel来和主进程进行通信
func subTask(c chan int) {
    defer close(c)
    for i :=0;i< 10;i++ {
        c <- i
    }
}

func main() {
    isok := make(chan int,10)
    go subTask(isok)
    // 通过channel来让子goroutine和主线程共享内存(通过通信实现共享内存)
    for v := range isok {
        fmt.Println(v)
    }
}
$ go run channel-sync.go
0
1
2
3
4
5
6
7
8
9

2.context方式传递数据给主线程

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
$ cat context-sync.go
package main
import (
    "context"
    "fmt"
)

func subTask(ctx context.Context) (chan int) {
    dst := make(chan int)
    n := 0
    go func() {
				// 通过使用select执行和channel相关的IO操作(类似switch)
				// select关键字其实是go并发模型中比较重要的
        for {
            select {
                case <- ctx.Done():
                    return
                case dst <- n:
                    n++
            }
        }
    }()
    return dst
}

func main() {
    ctx,cancel := context.WithCancel(context.Background())
    defer cancel()

    // subTask中其实是一个死循环会不断将n自增并返回到dst
    testChan := subTask(ctx)

    for n := range testChan {
        // 主进程通过chan中的值控制并发?然后通过cancel()来通知子routine结束
        if n == 9 {
          break
        }
        fmt.Println(n)
    }

}

$ go run context-sync.go
0
1
2
3
4
5
6
7
8

3.通过sync.WaitGroup来实现

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
$ cat waitgroup-sync.go
package main
import (
    "sync"
    "fmt"
)

func subTask(wg *sync.WaitGroup){
    // 在并发函数中通知waitgroup完成
    defer wg.Done()
    for i := 0;i < 10;i++ {
        fmt.Println(i)
    }
}

func main() {
    wg := &sync.WaitGroup{}
    // 设置waitgroup等待次数(并发次数)
    wg.Add(2)
    for i:=0;i<2;i++ {
      go subTask(wg)
    }
    wg.Wait()
}

$go run waitgroup-sync.go
0
1
2
3
4
5
6
7
8
9
0
1
2
3
4
5
6
7
8
9

4.多个子goroutine之间通信

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
$ cat mutil-goroutines-sync.go
package main
import (
    "fmt"
)

// 两个task互相协作,通过两个channel来互相通知对方执行的阻塞和继续
// 因为c2其实是一个带缓冲的(1)的channel,会阻塞主直到另外一个task处理
func task1(c1 chan bool,c2 chan bool) {
    for i:= 1;i< 11;i += 2{
        // c2是一个带缓冲的channel,因此第一次打印12后到这里会等待读取c2
        <- c2
        fmt.Printf("%d",i)
        fmt.Printf("%d",i+1)
        c1 <- true

    }
}

func task2(c1 chan bool,c2 chan bool,c3 chan struct{}) {
    char_seq := [...]string{"A", "B", "C", "D", "E", "F", "G", "H", "I", "J"}
    for i:=0;i<10;i += 2{
        <- c1
        fmt.Printf("%s", char_seq[i])
        fmt.Printf("%s", char_seq[i+1])
        c2 <- true
    }
    // 通知主进程任务执行完成
    c3 <- struct{}{}
}


func main() {
    c1 := make(chan bool)
    // 通过有缓冲的channel来争抢执行
    c2 := make(chan bool,1)
    // 定义一个struct{}类型的channel
    done := make(chan struct{})

    // 创建两个goroutine在后台执行
    go task1(c1,c2)
    go task2(c1,c2,done)

    // 继续执行,此时channel c2开始通过值告诉task1开始执行
    fmt.Println("begin")
    c2 <- true
    // 通过一个struct{}类型的channel将主进程阻塞住
    <- done
}

$ go run mutil-goroutines-sync.go
begin
12AB34CD56EF78GH910IJ%

知识星球

公众号