协程(Goroutine)是Go语言中的一种轻量级调度单元,它允许在一个单独的进程空间内并发执行多个任务。Go语言通过内置的运行时管理协程,这使得创建和调度协程变得非常高效。与传统的线程相比,协程占用的内存更少,创建和销毁的开销也更小。信道(Channel)则是配合协程使用的,协程可以从信道中存取内容,实现协程之间的通信。
这篇笔记我们介绍Go语言中的协程和信道,以及如何使用它们来实现并发编程。
我们知道线程是由操作系统提供的并发执行的功能,它们是操作系统调度的基本单位。而协程则是一种程序代码层面上的概念,它允许实现多任务的并发执行,但调度由程序自身控制,不依赖于操作系统的线程调度。在Go语言中,协程被称为Goroutine,它们是由Go运行时管理的,而不是操作系统。这意味着Goroutine的创建、执行和调度都是在用户态完成的,这使得它们比操作系统线程更加轻量级和高效。因此,协程可以在不增加太多资源负担的情况下,轻松创建成千上万个实例,这在处理大量并发任务时非常有用。
此外,虽然Go语言中没有线程的概念,但这并不是说Go语言不能利用多核处理器并行处理数据,Go语言运行时在底层实现了对多核处理器的使用,只是我们通常不必对其过多关注。
下面代码使用协程运行了一个函数,函数中会使用time.Sleep()
阻塞0.5秒,然后打印信息返回。
package main
import (
"fmt"
"time"
"sync"
)
var wg = sync.WaitGroup{}
func myCoroutine(msg string) {
time.Sleep(500 * time.Millisecond)
fmt.Println(msg)
wg.Done()
}
func main() {
fmt.Println("main")
wg.Add(1)
go myCoroutine("coroutine")
wg.Wait()
}
Go和其它语言一样,如果主协程退出子协程也就被销毁了,所以如果不使用WaitGroup
,我们永远也看不到子协程执行完成。那么如何在最后等待所有协程退出再退出主协程呢?这需要一个安全标志变量来记录是否所有的子协程已退出。不过我们不用自己定义这样的一个标志量,WaitGroup
就是一个现成的工具,它设计的像是一个计数器,我们开启一个协程,调用Add(1)
计数器就加1,协程执行完就调用Done()
,计数器自动减1。主协程最后调用Wait()
,如果计数器还未减为0,主协程就会阻塞等待,直到所有协程都成功执行完成。
单纯的让Goroutine并发执行并没有什么特殊意义,我们还需要在Goroutine之间传递数据,Go语言中这可以通过信道(Channel)类型实现,信道使用<-
操作符发送和接收数据,下面是一个例子。
package main
import (
"fmt"
"time"
)
func calcSum(num int, c chan int) {
time.Sleep(1000 * time.Millisecond)
sum := 0
for i := 1; i <= num; i++ {
sum += i
}
c <- sum
}
func main() {
start := time.Now()
c := make(chan int)
go calcSum(100, c)
go calcSum(101, c)
sum, sum2 := <- c, <- c
fmt.Println(sum, sum2)
timeSpend := time.Since(start)
fmt.Println(timeSpend)
}
这里假设上面代码中calcSum()
是个耗时操作,我们起了两个协程,并将一个信道作为参数传递到协程中。
信道如果没有数据,接收端就会阻塞,我们在主协程编写了sum, sum2 := <- c, <- c
,这要求必须拿到两个返回结果,在两个协程都执行完之前,主协程会阻塞等待,因此我们此处不需要WaitGroup
来等待了。
前面我们使用的是无缓冲区信道,无缓冲信道在发送操作时会阻塞直到有接收者接收数据,而在接收操作时会阻塞直到有发送者发送数据。Go语言还支持为信道设置一个缓冲区大小,这样它就表现的就会像一个队列。信道满时发送端会阻塞,信道空时接收端会阻塞。
ch := make(chan int, 100)
信道发送端可以调用close()
显式关闭一个信道,当没有值可用时,如果用v, ok := <-ch
的形式访问信道内的值,ok
会返回false。更方便的做法是使用range
,像访问数组一样访问信道,下面是一个例子。
package main
import "fmt"
func producer(c chan int) {
for i := 0; i < 1000; i++ {
c <- i
}
close(c)
}
func main() {
c := make(chan int, 100)
go producer(c)
for i := range c {
fmt.Println(i)
}
}
注意:在上面例子中,子协程中必须用close()
关闭信道,否则主协程根本不知道何时信道上的消息发送完成,理论上会一直阻塞等待。当然如果你真的尝试了一下会发现如果去掉close(c)
这一行代码,程序会报如下错误:
fatal error: all goroutines are asleep - deadlock!
Go语言中,使用select
关键字能够实现在多个信道上阻塞等待,如果有一个信道有信息输出,就会执行对应的分支。
package main
import (
"time"
"fmt"
)
func producerA(c chan string) {
for i := 0; i < 100; i++ {
time.Sleep(100 * time.Millisecond)
c <- "A"
}
close(c)
}
func producerB(c chan string) {
for i := 0; i < 100; i++ {
time.Sleep(100 * time.Millisecond)
c <- "B"
}
close(c)
}
func main() {
c1 := make(chan string, 100)
c2 := make(chan string, 100)
go producerA(c1)
go producerB(c2)
select {
case c := <-c1:
fmt.Println(c)
case c := <-c2:
fmt.Println(c)
}
}
上面代码可能输出A
也可能输出B
,只要两个协程有一个先返回,主协程就会返回。select
语句也可以加一个默认的default
分支,如果当前等待的其他信道都没有数据要接收就会进入default
分支。
select {
case c := <-c1:
fmt.Println(c)
case c := <-c2:
fmt.Println(c)
default:
fmt.Println("暂无")
}
虽然Go语言协程十分轻量级但这并不意味着我们能无限创建协程,很多需求场景也要求我们控制并发量的最大值,在传统多线程并发模型中这需要通过设置线程池来实现,Go语言中则可以使用带缓冲信道作为信号量来控制并发数。
package main
import (
"fmt"
"strconv"
"sync"
"time"
)
var wg sync.WaitGroup
func worker(s string, semaphore chan struct{}) {
defer func() { <-semaphore }()
defer func() { wg.Done() }()
time.Sleep(1000 * time.Millisecond)
fmt.Println(s)
}
func main() {
wg = sync.WaitGroup{}
semaphore := make(chan struct{}, 3)
for i := 0; i < 30; i++ {
wg.Add(1)
semaphore <- struct{}{}
go worker("goroutine "+strconv.Itoa(i), semaphore)
}
wg.Wait()
}
代码中我们使用了一个缓冲区大小为3个信道,在for
循环中我们向这个信道不断写入数据,协程执行完成后则会从信道中去除数据来腾出缓冲区供继续写入,当缓冲区已满子协程又未消费时,主协程就会阻塞等待,这样我们就将这个程序的并发数控制为了3。此外,我们创建的信号量信道类型为chan struct{}
,这是因为struct{}
类型不占用任何内存空间,所以使用它作为信号量信道类型是一种常见的写法。
锁(Lock)是并发编程中的重要概念,它用于防止多个并发逻辑单元同时访问共享资源,从而避免数据竞争。Go语言提供了多种锁机制,包括互斥锁sync.Mutex
和读写锁sync.RWMutex
。这两种锁很容易理解,互斥锁仅允许同一时间有一个并发单元访问共享数据;读写锁则粒度更细一些,它允许多个读操作并发执行,但写操作是互斥的,这种锁适用于读多写少的场景。下面例子演示了互斥锁的使用。
package main
import (
"fmt"
"sync"
)
var (
count int
mutex sync.Mutex
)
func main() {
wg := sync.WaitGroup{}
for i := 0; i < 100; i++ {
wg.Add(1)
go func() {
defer wg.Done()
mutex.Lock()
count++
mutex.Unlock()
}()
}
wg.Wait()
fmt.Println(count)
}
代码中我们循环创建了100个协程,每个协程对全局变量count
加1,但为了防止数据竞争,我们使用了互斥锁来保护count
变量,程序运行完成后我们可以看到输出值为100。这里如果去掉加锁和解锁代码,我们会看到输出的值是不确定的。