Golang中的channel与sync.Mutex

一个现象

假设我们有一个任务,包括两个goroutine:

  1. writer:负责每秒给counter加一
  2. reader:负责每秒打印counter的值

如果用lock的话

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
type Controller struct {
cnt int
mu sync.Mutex
}

func (c *Controller) writer () {
for {
c.mu.Lock()
c.cnt += 1
c.mu.Unlock()
time.Sleep(time.Second)
}
}

func (c *Controller) reader () {
for {
c.mu.Lock()
fmt.Printf("read: %v\n", c.cnt)
c.mu.Unlock()
time.Sleep(time.Second)
}
}

func main() {
c := Controller{}
go c.writer()
go c.reader()
time.Sleep(20 * time.Second)
}

【这是错误的】:一开始我以为是reader和writer交替,假设writer先抢到lock,那么reader一定在lock那里block,然后等writer做完操作,释放lock并进入sleep时,reader被唤醒,进行读操作。怎么可能同时发生两次write呢?

Untitled

但是运行后,发现读写(居然)并非交替运行:

1
2
3
4
5
6
7
> go run lock.go
read: 1
read: 1
read: 3
read: 3
read: 5
...

原因是临界区中操作时间过短,导致两个goroutine接近同时操作完后同时进入等待,接近同时唤醒,接近同时抢lock,图中两个虚线之间则是两个goroutine同时抢lock的区间

Untitled

如果给reader和writer的操作都加一点时间,则可以顺序打印,如writer改为:

1
2
3
4
5
6
7
8
9
func (c *Controller) writer () {
for {
c.mu.Lock()
c.cnt += 1
time.Sleep(time.Second)
c.mu.Unlock()
time.Sleep(time.Second)
}
}

运行验证:

1
2
3
4
5
6
7
> go run lock.go
read: 1
read: 2
read: 3
read: 4
read: 5
...

用channel代替mutex

我们如何使用channel来代替以上的mutex,并实现相同的效果?

先分析Lock和Unlock操作:

1
2
3
c.mu.Lock()
c.cnt += 1
c.mu.Unlock()

Lock操作是block类型,也即如果没有Mutex,需要一直等待;而Unlock不是block类型,不管有没有goroutine正在Lock这里被block了,反正Unlock都能执行完。

因此,不能简单的用unbuffered channel进行替换,【错误的】例如:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
type void struct{}
type Controller struct {
cnt int
ch chan void
}
func (c *Controller) writer() {
for {
// block here
<-c.ch
c.cnt += 1
// also block here (different from mutex)
c.ch <- void{}
time.Sleep(time.Second)
}
}
func (c *Controller) reader() {
for {
// block here
<-c.ch
fmt.Printf("read: %v\n", c.cnt)
// also block here (different from mutex)
c.ch <- void{}
time.Sleep(time.Second)
}
}
func main() {
c := Controller{
ch: make(chan void),
}
go c.writer()
go c.reader()
c.ch <- void{}
time.Sleep(20 * time.Second)
}

Untitled

所以运行结果是差不多每两秒打印一个值,并且是顺序打印。

因此我们需要让Unlock对应的操作不属于block模式

buffer=1的channel

使用一个buffer=1的channel来代替mutex,则会让放数据到channel的操作block,而拿数据则由于发生同一个goroutine中的放数据之后,因此能保证channel中有数据,因此不会block。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
type void struct{}

type Controller struct {
cnt int
ch chan void
}

func (c *Controller) writer() {
for {
c.ch <- void{}
c.cnt += 1
<-c.ch
time.Sleep(time.Second)
}
}

func (c *Controller) reader() {
for {
c.ch <- void{}
fmt.Printf("read: %v\n", c.cnt)
<-c.ch
time.Sleep(time.Second)
}
}

func main() {
c := Controller{
ch: make(chan void, 1),
}
go c.writer()
go c.reader()
time.Sleep(5 * time.Second)
}

new goroutine解决block的问题

将unbuffered channel那种做法中的放数据到channel那步放到新建的goroutine中来做:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
type void struct{}

type Controller struct {
cnt int
ch chan void
}

func (c *Controller) writer() {
for {
<-c.ch
c.cnt += 1
// without block
go func() { c.ch <- void{} }()
time.Sleep(time.Second)
}
}

func (c *Controller) reader() {
for {
<-c.ch
fmt.Printf("read: %v\n", c.cnt)
// without block
go func() { c.ch <- void{} }()
time.Sleep(time.Second)
}
}

func main() {
c := Controller{
ch: make(chan void),
}
go c.writer()
go c.reader()
c.ch <- void{}
time.Sleep(20 * time.Second)
}

Golang中的channel与sync.Mutex
https://fffffaraway.github.io/2022/08/30/Golang中的channel与sync-Mutex/
Author
Song Wei
Posted on
August 30, 2022
Licensed under