一个现象
假设我们有一个任务,包括两个goroutine:
- writer:负责每秒给counter加一
- reader:负责每秒打印counter的值
如果用lock的话
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
| type Controller struct { cnt int mu sync.Mutex }
func (c *Controller) writer () { for { c.mu.Lock() c.cnt += 1 c.mu.Unlock() time.Sleep(time.Second) } }
func (c *Controller) reader () { for { c.mu.Lock() fmt.Printf("read: %v\n", c.cnt) c.mu.Unlock() time.Sleep(time.Second) } }
func main() { c := Controller{} go c.writer() go c.reader() time.Sleep(20 * time.Second) }
|
【这是错误的】:一开始我以为是reader和writer交替,假设writer先抢到lock,那么reader一定在lock那里block,然后等writer做完操作,释放lock并进入sleep时,reader被唤醒,进行读操作。怎么可能同时发生两次write呢?
但是运行后,发现读写(居然)并非交替运行:
1 2 3 4 5 6 7
| > go run lock.go read: 1 read: 1 read: 3 read: 3 read: 5 ...
|
原因是临界区中操作时间过短,导致两个goroutine接近同时操作完后同时进入等待,接近同时唤醒,接近同时抢lock,图中两个虚线之间则是两个goroutine同时抢lock的区间
如果给reader和writer的操作都加一点时间,则可以顺序打印,如writer改为:
1 2 3 4 5 6 7 8 9
| func (c *Controller) writer () { for { c.mu.Lock() c.cnt += 1 time.Sleep(time.Second) c.mu.Unlock() time.Sleep(time.Second) } }
|
运行验证:
1 2 3 4 5 6 7
| > go run lock.go read: 1 read: 2 read: 3 read: 4 read: 5 ...
|
用channel代替mutex
我们如何使用channel来代替以上的mutex,并实现相同的效果?
先分析Lock和Unlock操作:
1 2 3
| c.mu.Lock() c.cnt += 1 c.mu.Unlock()
|
Lock操作是block类型,也即如果没有Mutex,需要一直等待;而Unlock不是block类型,不管有没有goroutine正在Lock这里被block了,反正Unlock都能执行完。
因此,不能简单的用unbuffered channel进行替换,【错误的】例如:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
| type void struct{} type Controller struct { cnt int ch chan void } func (c *Controller) writer() { for { <-c.ch c.cnt += 1 c.ch <- void{} time.Sleep(time.Second) } } func (c *Controller) reader() { for { <-c.ch fmt.Printf("read: %v\n", c.cnt) c.ch <- void{} time.Sleep(time.Second) } } func main() { c := Controller{ ch: make(chan void), } go c.writer() go c.reader() c.ch <- void{} time.Sleep(20 * time.Second) }
|
所以运行结果是差不多每两秒打印一个值,并且是顺序打印。
因此我们需要让Unlock对应的操作不属于block模式
buffer=1的channel
使用一个buffer=1的channel来代替mutex,则会让放数据到channel的操作block,而拿数据则由于发生同一个goroutine中的放数据之后,因此能保证channel中有数据,因此不会block。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
| type void struct{}
type Controller struct { cnt int ch chan void }
func (c *Controller) writer() { for { c.ch <- void{} c.cnt += 1 <-c.ch time.Sleep(time.Second) } }
func (c *Controller) reader() { for { c.ch <- void{} fmt.Printf("read: %v\n", c.cnt) <-c.ch time.Sleep(time.Second) } }
func main() { c := Controller{ ch: make(chan void, 1), } go c.writer() go c.reader() time.Sleep(5 * time.Second) }
|
new goroutine解决block的问题
将unbuffered channel那种做法中的放数据到channel那步放到新建的goroutine中来做:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36
| type void struct{}
type Controller struct { cnt int ch chan void }
func (c *Controller) writer() { for { <-c.ch c.cnt += 1 go func() { c.ch <- void{} }() time.Sleep(time.Second) } }
func (c *Controller) reader() { for { <-c.ch fmt.Printf("read: %v\n", c.cnt) go func() { c.ch <- void{} }() time.Sleep(time.Second) } }
func main() { c := Controller{ ch: make(chan void), } go c.writer() go c.reader() c.ch <- void{} time.Sleep(20 * time.Second) }
|