一、引言
Golang 作为一门现代编程语言,以其出色的并发性能和简洁的语法而备受青睐。在 Golang 中,并发原语是实现高效并发编程的关键。它们为开发者提供了强大的工具,用于处理多任务并发执行的各种场景。本文将详细介绍 Golang 中的并发原语,包括其概念、使用场景以及示例代码,帮助读者更好地理解和运用这些重要的编程元素。
二、并发原语概述
并发原语是 Golang 中用于支持并发编程的机制,它们使得程序能够在同一时间内处理多个任务,提高程序的执行效率和资源利用率。Golang 中的主要并发原语包括go routine、channel、select以及sync包中的相关组件。
(一)go routine
go routine是 Golang 中轻量级的线程实现,它允许我们在一个进程中同时运行多个函数。通过使用go关键字,我们可以轻松地将一个函数转换为一个go routine,使其在后台独立运行。
(二)channel
channel用于在go routine之间进行通信和同步。它提供了一种安全的方式来传递数据,确保在多个go routine访问共享数据时不会出现数据竞争等并发问题。
(三)select
select语句用于同时监听多个channel的操作,如读取或写入。它可以根据channel的状态来决定执行哪个分支,实现高效的多路复用。
(四)sync包
sync包提供了一系列用于并发控制的工具,如互斥锁(Mutex)、读写锁(RWMutex)、等待组(WaitGroup)等,用于保证共享资源的安全访问和协调多个go routine的执行。
groutine
- Go 中的并发执行单位,类似于轻量级的线程。
- Goroutine 的调度由 Go 运行时管理,用户无需手动分配线程。
- 使用 go 关键字启动 Goroutine。
- Goroutine 是非阻塞的,可以高效地运行成千上万个 Goroutine。
其实很简单 用go前缀开启即可
go functname()
package mainimport ("fmt""time"
)func sayHello() {for i := 0; i < 5; i++ {fmt.Println("Hello")time.Sleep(100 * time.Millisecond)}
}func main() {go sayHello() // 启动 Goroutinefor i := 0; i < 5; i++ {fmt.Println("Main")time.Sleep(100 * time.Millisecond)}
}
执行以上代码,你会看到输出的 Main 和 Hello。输出是没有固定先后顺序,因为它们是两个 goroutine 在执行:
Main
Hello
Main
Hello
使用场景
- 并发任务处理:http 服务器,为每个请求启动一个goroutine 进行任务处理
- 异步调用:主流程对调用结果不关心的情况下,可以通过goroutine来模拟异步调用
- 后台任务:如后台定时任务,定期执行某些操作
- 并行计算:将一个大任务拆分成若干个小任务并行执行,加快整体计算速度
注意!
当main中开启了协程,但是main函数提前结束了(没写for循环或者sleep等),此时协程也会提前结束
示例
package mainimport ("fmt"
)func sendEmail() {// 模拟发送邮件的耗时操作fmt.Println("发送邮件中...")
}func main() {go sendEmail()fmt.Println("主流程继续执行...")
}
在这个例子中,sendEmail函数在go routine中异步执行,主流程不会等待邮件发送完成,而是继续执行后续代码。
package mainimport ("fmt""net/http"
)func handleRequest(w http.ResponseWriter, r *http.Request) {// 模拟处理请求的耗时操作fmt.Println("处理请求中...")
}func main() {http.HandleFunc("/", handleRequest)err := http.ListenAndServe(":8080", nil)if err!= nil {fmt.Println("启动HTTP服务器失败:", err)}
}
在处理大量并发请求的场景中,如 HTTP 服务器,每个请求都可以启动一个go routine来独立处理。这样可以提高服务器的并发处理能力,避免单个请求阻塞整个服务器。
在上述示例中,虽然没有显示的启动,但是内部是用groutine的
每次接收到 HTTP 请求时,都会在一个新的go routine中处理请求,从而实现并发处理多个请求。
channel
语法
使用 make 函数创建一个 channel,使用 <-
操作符发送和接收数据。如果未指定方向,则为双向通道。
ch <- v // 把 v 发送到通道 ch
v := <-ch // 从 ch 接收数据 并把值赋给 v
声明一个通道很简单,我们使用chan关键字即可,通道在使用前必须先创建:
ch := make(chan int)
注意:默认情况下,通道是不带缓冲区的。发送端发送数据,同时必须有接收端相应的接收数据。
以下实例通过两个 goroutine 来计算数字之和,在 goroutine 完成计算后,它会计算两个结果的和:
package mainimport "fmt"func sum(s []int, c chan int) {sum := 0for _, v := range s {sum += v}c <- sum // 把 sum 发送到通道 c
}func main() {s := []int{7, 2, 8, -9, 4, 0}c := make(chan int)go sum(s[:len(s)/2], c)go sum(s[len(s)/2:], c)x, y := <-c, <-c // 从通道 c 中接收fmt.Println(x, y, x+y)
}
输出结果为:
-5 17 12
package mainimport "fmt"func main() {ch := make(chan int)go func() {ch <- 42 // 向channel发送数据}()data := <-ch // 从channel接收数据fmt.Println("接收到的数据:", data)
}
main函数也可以理解为一个groutine
通道缓冲区
通道可以设置缓冲区,通过 make 的第二个参数指定缓冲区大小:
ch := make(chan int, 100)
带缓冲区的通道允许发送端的数据发送和接收端的数据获取处于异步状态,就是说发送端发送的数据可以放在缓冲区里面,可以等待接收端去获取数据,而不是立刻需要接收端去获取数据。
不过由于缓冲区的大小是有限的,所以还是必须有接收端来接收数据的,否则缓冲区一满,数据发送端就无法再发送数据了。
注意:如果通道不带缓冲,发送方会阻塞直到接收方从通道中接收了值。如果通道带缓冲,发送方则会阻塞直到发送的值被拷贝到缓冲区内;如果缓冲区已满,则意味着需要等待直到某个接收方获取到一个值。接收方在有值可以接收之前会一直阻塞。
package mainimport "fmt"func main() {// 这里我们定义了一个可以存储整数类型的带缓冲通道// 缓冲区大小为2ch := make(chan int, 2)// 因为 ch 是带缓冲的通道,我们可以同时发送两个数据// 而不用立刻需要去同步读取数据ch <- 1ch <- 2 //如果没有缓冲 这里会阻塞// 获取这两个数据fmt.Println(<-ch)fmt.Println(<-ch)
}
执行输出结果为:
1
2
- 无缓冲channe:发送接受都会阻塞,发送操作会阻塞,直到有接收者准备好接收数据。同样,接收也会阻塞,直到有发送者发送数据。这样保证了发送和接收是同步的。
- 有缓冲channel:可以存储一定数量的数据,只有当缓冲区满时,发送才会阻塞;只有当缓冲区空时,接收才会阻塞。
ps 一个有趣例子
这样写不会阻塞
注意:
1、当main中开启了协程,但是main函数提前结束了(没写for循环或者sleep等),此时协程也会提前结束
2、由于赋值和协程同时运行,所以赋值语句无法成功!这时候就要用到通道channel
3、可用匿名函数的写法,但是如果不使用管道 依然要for循环,
4,当使用无缓存的channel,main函数无需写for循环,因为这里会自动阻塞,等待写入数据。
5、当使用不超出缓存的channel不会阻塞
6、超出缓存的channel会阻塞
Go 遍历通道与关闭通道
Go 通过 range 关键字来实现遍历读取到的数据,类似于与数组或切片。格式如下:
v, ok := <-ch
如果通道接收不到数据后 ok 就为 false,这时通道就可以使用 close() 函数来关闭。
实例
package mainimport ("fmt"
)func fibonacci(n int, c chan int) {x, y := 0, 1for i := 0; i < n; i++ {c <- xx, y = y, x+y}close(c)
}func main() {c := make(chan int, 10)go fibonacci(cap(c), c)// range 函数遍历每个从通道接收到的数据,因为 c 在发送完 10 个// 数据之后就关闭了通道,所以这里我们 range 函数在接收到 10 个数据// 之后就结束了。如果上面的 c 通道不关闭,那么 range 函数就不// 会结束,从而在接收第 11 个数据的时候就阻塞了。for i := range c {fmt.Println(i)}
}
执行输出结果为:
0
1
1
2
3
5
8
13
21
34
如果不关闭channel 会报错死锁,因为main还在等待写入数据,但是不会有数据了
语法糖——使用range不用写break和for
channel的注意事项
- 避免死锁
确保发送和接收操作能够匹配,避免因为通道的两端都在等待对方而导致死锁。 - 注意会不会panic
- 优雅关闭
- 注意阻塞以及内存泄露
- 使用select处理多个通道
当需要同时监听多个通道时,使用 select 语句可以提高代码的效率和可读性。 - 慎用全局通道
全局通道可能导致难以调试的并发问题,尽量在局部范围内使用通道。
注意:对已经关闭的通道再执行 close 也会引发 panic。
关闭(close)未初始化的channel会引起panic。
nil channel代表channel未初始化,向未初始化的channel读写数据会造成阻塞
从一个关闭的并且没有值的通道执行接收操作会得到对应类型的零值,并不会引起panic。
死锁
go 语言新手在编译时很容易碰到这个死锁的问题:
fatal error: all goroutines are asleep - deadlock!
这个就是喜闻乐见的「死锁」了…… 在操作系统中,我们学过,「死锁」就是两个线程互相等待,耗在那里,最后程序不得不终止。go 语言中的「死锁」也是类似的,两个 goroutine 互相等待,导致程序耗在那里,无法继续跑下去。看了很多死锁的案例后,channel 导致的死锁可以归纳为以下几类案例(先讨论 unbuffered channel 的情况):
1 只有生产者,没有消费者,或者反过来
channel 的生产者和消费者必须成对出现,如果缺乏一个,就会造成死锁,例如:
// 只有生产者,没有消费者
func f1() {ch := make(chan int)ch <- 1
}
或是:
// 只有消费者,没有生产者
func f2() {ch := make(chan int)<-ch
}
2 生产者和消费者出现在同一个 goroutine 中
除了需要成对出现,还需要出现在不同的 goroutine 中,例如:
// 同一个 goroutine 中同时出现生产者和消费者
func f3() {ch := make(chan int)ch <- 1 // 由于消费者还没执行到,这里会一直阻塞住<-ch
}
3 buffered channel 已满,且出现上述情况
buffered channel 会将收到的元素先存在 hchan 结构体的 ringbuffer 中,继而才会发生阻塞。而当发生阻塞时,如果阻塞了主 goroutine ,则也会出现死锁。
所以实际使用中,推荐尽量使用 buffered channel ,使用起来会更安全,在下文的「内存泄漏」相关内容也会提及。
channel 的哪些操作会引发 panic?
1 关闭一个 nil 值的 channel 会引发 panic
package mainfunc main() {var ch chan struct{}close(ch)
}
2 关闭一个已关闭的 channel 会引发 panic
package mainfunc main() {ch := make(chan struct{})close(ch)close(ch)
}
在实际开发过程中,处理多个 goroutine 之间协作时,可能存在一个 goroutine 已经 close 掉 channel 了,另外一个不知道,也去 close 一下,就会 panic 掉,例如:
func p1() {ch := make(chan int, 1)done := make(chan struct{}, 1)go func() {<- time.After(2*time.Second)println("close2")close(ch)close(done)}()go func() {<- time.After(1*time.Second)println("close1")ch <- 1close(ch)}()<-done
}
万恶之源就是在 go 语言里,你是无法知道一个 channel 是否已经被 close 掉的,所以在尝试做 close 操作的时候,就应该做好会 panic 的准备……
3 向一个已关闭的 channel 发送数据
package mainfunc main() {ch := make(chan struct{})close(ch)ch <- struct{}{}
}
关闭
- channel不像文件一样需要经常去关闭,只有当你确实没有任何发送数据了,或者你想显式的结束range循环之类的,而且确信代码逻辑很清晰 不复杂 不会引发之前关闭两次以上同一个channel的问题,才去关闭channel;
- 如果代码逻辑比较复杂,除非必须关闭 chan,否则不要主动关闭。关闭 chan 最优雅的方式,就是不要关闭 chan,等gc自己回收。
- 关闭channel后,无法向channel 再发送数据(会引发 panic 错误后导致接收立即返回零值);
- 关闭channel后,可以继续从channel接收数据;对于nil channel,无论收发都会被阻塞。
我们需要检查 channel 是否关闭吗?
刚遇到上面说的 panic 问题时,我也试过去找一个内置的 closed 函数来检查关闭状态,结果发现,并没有这样一个函数……
那么,如果有这样的函数,真能彻底解决 panic 的问题么?答案是不能。因为 channel 是在一个并发的环境下去做收发操作,就算当前执行 closed(ch) 得到的结果是 false,还是不能直接去关,例如如下 yy 出来的代码:
复制
if !closed(ch) { // 返回 false// 在这中间出了幺蛾子!close(ch) // 还是 panic 了……
}
遵循 less is more 的原则,这个 closed 函数是要不得了
2 需要 close 吗?为什么?
结论:除非必须关闭 chan,否则不要主动关闭。关闭 chan 最优雅的方式,就是不要关闭 chan~。
当一个 chan 没有 sender 和 receiver 时,即不再被使用时,GC 会在一段时间后标记、清理掉这个 chan。那么什么时候必须关闭 chan 呢?比较常见的是将 close 作为一种通知机制,尤其是生产者与消费者之间是 1:M 的关系时,通过 close 告诉下游:我收工了,你们别读了。
3 谁来关?
chan 关闭的原则:
Don’t close a channel from the receiver side 不要在消费者端关闭 chan。
Don’t close a channel if the channel has multiple concurrent senders 有多个并发写的生产者时也别关。
只要我们遵循这两条原则,就能避免两种 panic 的场景,即:向 closed chan 发送数据,或者是 close 一个 closed chan。
按照生产者和消费者的关系可以拆解成以下几类情况:
一写一读:生产者关闭即可。
一写多读:生产者关闭即可,关闭时下游全部消费者都能收到通知。
多写一读:多个生产者之间需要引入一个协调 channel 来处理信号。
多写多读:与 3 类似,核心思路是引入一个中间层以及使用try-send 的套路来处理非阻塞的写入,例如:
func main() {rand.Seed(time.Now().UnixNano())log.SetFlags(0)const Max = 100000const NumReceivers = 10const NumSenders = 1000wgReceivers := sync.WaitGroup{}wgReceivers.Add(NumReceivers)dataCh := make(chan int)stopCh := make(chan struct{})// stopCh 是额外引入的一个信号 channel.// 它的生产者是下面的 toStop channel,// 消费者是上面 dataCh 的生产者和消费者toStop := make(chan string, 1)// toStop 是拿来关闭 stopCh 用的,由 dataCh 的生产者和消费者写入// 由下面的匿名中介函数(moderator)消费// 要注意,这个一定要是 buffered channel (否则没法用 try-send 来处理了)var stoppedBy string// moderatorgo func() {stoppedBy = <-toStopclose(stopCh)}()// sendersfor i := 0; i < NumSenders; i++ {go func(id string) {for {value := rand.Intn(Max)if value == 0 {// try-send 操作// 如果 toStop 满了,就会走 default 分支啥也不干,也不会阻塞select {case toStop <- "sender#" + id:default:}return}// try-receive 操作,尽快退出// 如果没有这一步,下面的 select 操作可能造成 panicselect {case <- stopCh:returndefault:}// 如果尝试从 stopCh 取数据的同时,也尝试向 dataCh// 写数据,则会命中 select 的伪随机逻辑,可能会写入数据select {case <- stopCh:returncase dataCh <- value:}}}(strconv.Itoa(i))}// receiversfor i := 0; i < NumReceivers; i++ {go func(id string) {defer wgReceivers.Done()for {// 同上select {case <- stopCh:returndefault:}// 尝试读数据select {case <- stopCh:returncase value := <-dataCh:if value == Max-1 {select {case toStop <- "receiver#" + id:default:}return}log.Println(value)}}}(strconv.Itoa(i))}wgReceivers.Wait()log.Println("stopped by", stoppedBy)
}
内存泄露
内存泄漏
内存泄漏一般都是通过 OOM(Out of Memory) 告警或者发布过程中对内存的观察发现的,服务内存往往都是缓慢上升,直到被系统 OOM 掉清空内存再周而复始。
在 go 语言中,错误地使用 channel 会导致 goroutine 泄漏,进而导致内存泄漏。
1 如何实现 goroutine 泄漏呢?
不会修 bug,我还不会写 bug 吗?让 goroutine 泄漏的核心就是:
生产者/消费者 所在的 goroutine 已经退出,而其对应的 消费者/生产者 所在的 goroutine 会永远阻塞住,直到进程退出。
2 生产者阻塞导致泄漏
我们一般会用 channel 来做一些超时控制,例如下面这个例子:
func leak1() {ch := make(chan int)// g1go func() {time.Sleep(2 * time.Second) // 模拟 io 操作ch <- 100 // 模拟返回结果}()// g2// 阻塞住,直到超时或返回select {case <-time.After(500 * time.Millisecond):fmt.Println("timeout! exit...")case result := <-ch:fmt.Printf("result: %d\n", result)}
}
这里我们用 goroutine g1 来模拟 io 操作,主 goroutine g2 来模拟客户端的处理逻辑。
假设客户端超时为 500ms,而实际请求耗时为 2s,则 select 会走到 timeout 的逻辑,这时g2 退出,channelch 没有消费者,会一直在等待状态,输出如下:
Goroutine num: 1
timeout! exit...
Goroutine num: 2
如果这是在 server 代码中,这个请求处理完后,g1 就会挂起、发生泄漏了,就等着 OOM 吧 = =。
假设客户端超时调整为 5000ms,实际请求耗时 2s,则 select 会进入获取 result 的分支,输出如下:
Goroutine num: 1
timeout! exit...
Goroutine num: 2
3 消费者阻塞导致泄漏
如果生产者不继续生产,消费者所在的 goroutine 也会阻塞住,不会退出,例如:
func leak2() {ch := make(chan int)// 消费者 g1go func() {for result := range ch {fmt.Printf("result: %d\n", result)}}()// 生产者 g2ch <- 1ch <- 2time.Sleep(time.Second) // 模拟耗时fmt.Println("main goroutine g2 done...")
}
这种情况下,只需要增加 close(ch) 的操作即可,for-range 操作在收到 close 的信号后会退出、goroutine 不再阻塞,能够被回收。
4 如何预防内存泄漏?
预防 goroutine 泄漏的核心就是:
- 创建 goroutine 时就要想清楚它什么时候被回收。
具体到执行层面,包括:
- 当 goroutine 退出时,需要考虑它使用的 channel 有没有可能阻塞对应的生产者、消费者的 goroutine。
- 尽量使用buffered channel使用buffered channel 能减少阻塞发生、即使疏忽了一些极端情况,也能降低 goroutine 泄漏的概率。
使用场景拓展——并发控制(以及函数参数如何使用channel)
channel可以用于控制并发的go routine数量。例如,我们可以创建一个带有缓冲区的channel,当缓冲区满时,go routine将被阻塞,从而限制同时执行的go routine数量。
package mainimport ("fmt""time"
)func worker(id int, jobs <-chan int, results chan<- int) {for j := range jobs {fmt.Printf("工人 %d 开始处理任务 %d\n", id, j)time.Sleep(time.Second)fmt.Printf("工人 %d 完成任务 %d\n", id, j)results <- j * 2}
}func main() {const numJobs = 5jobs := make(chan int, numJobs)results := make(chan int, numJobs)// 启动3个工人go routinefor w := 1; w <= 3; w++ {go worker(w, jobs, results)}// 发送任务到jobs channelfor j := 1; j <= numJobs; j++ {jobs <- j}close(jobs)// 接收并打印结果for a := 1; a <= numJobs; a++ {<-results}
}
在这个示例中,通过jobs通道控制并发任务数量,当jobs通道缓冲区满时,发送任务的go routine将被阻塞,直到有工人go routine完成任务并从jobs通道读取数据。
select
select 是 Go 中的一个控制结构,类似于用于通信的 switch 语句。每个 case 必须是一个通信操作,要么是发送要么是接收。
在golang语言中,select语句 就是用来监听和channel有关的IO操作,当IO操作发生时,触发相应的case动作。有了 select语句,可以实现 main主线程 与 goroutine线程 之间的互动。
package mainimport "fmt"func main() {var c1, c2, c3 chan int // c1 c2 c3分别是一个信道var i1, i2 intselect {case i1 = <-c1:fmt.Printf("received ", i1, " from c1\n")case c2 <- i2:fmt.Printf("sent ", i2, " to c2\n")case i3, ok := (<-c3): // same as: i3, ok := <-c3if ok {fmt.Printf("received ", i3, " from c3\n")} else {fmt.Printf("c3 is closed\n")}default:fmt.Printf("no communication\n")}
}
select {case <-ch1 : // 检测有没有数据可读// 一旦成功读取到数据,则进行该case处理语句case ch2 <- 1 : // 检测有没有数据可写!// 一旦成功向ch2写入数据,则进行该case处理语句default:// 如果以上都没有符合条件,那么进入default处理流程
注:selectd的原理 第一次运行的时候 如果case不被满足 会从上到下依次执行每个case的判断条件,当执行完了后 就会采用io多路复用模型的通知 当满足条件时候再执行case体(不重新扫描了)所以下面的定时器能运行
go func() {for {select {case <-isLive:fmt.Println("hello world")case <-time.After(time.Second * 5): fmt.Println("exit")return}}}()
//注意 上面每次for 都会重新执行case的判断条件
//类似的还可用time.NewTicker(time.Second * 1)实现
sync包的使用场景及示例
等待组(WaitGroup)
等待组用于等待一组go routine完成工作。例如,在并行计算中,将一个大任务拆分成多个小任务,每个小任务在一个go routine中执行,使用等待组来等待所有小任务完成后再进行后续操作。
package mainimport ("fmt""sync"
)func worker(id int, wg *sync.WaitGroup) {defer wg.Done() // Goroutine 完成时调用 Done()fmt.Printf("Worker %d started\n", id)fmt.Printf("Worker %d finished\n", id)
}func main() {var wg sync.WaitGroupfor i := 1; i <= 3; i++ {wg.Add(1) // 增加计数器go worker(i, &wg)}wg.Wait() // 等待所有 Goroutine 完成fmt.Println("All workers done")
}
以上代码,执行输出结果如下:
Worker 1 started
Worker 1 finished
Worker 2 started
Worker 2 finished
Worker 3 started
Worker 3 finished
All workers done
在这个示例中,通过WaitGroup确保所有任务go routine都完成后,主go routine才继续执行后续代码。
互斥锁(Mutex)
互斥锁用于保护共享资源,确保在同一时刻只有一个go routine可以访问共享资源,防止数据竞争。
package mainimport ("fmt""sync"
)var (count intmutex sync.Mutex
)func increment() {mutex.Lock()count++mutex.Unlock()
}func main() {var wg sync.WaitGroupfor i := 0; i < 1000; i++ {wg.Add(1)go func() {defer wg.Done()increment()}()}wg.Wait()fmt.Println("最终计数:", count)
}
在这个示例中,多个go routine同时对count变量进行自增操作,通过互斥锁mutex保证了每次只有一个go routine能够修改count的值,避免了数据竞争。
读写锁(RWMutex)
读写锁适用于读多写少的场景,允许多个go routine同时读取共享资源,但在写入时需要独占锁。
package mainimport ("fmt""sync""time"
)var (data intrwlock sync.RWMutex
)func readData() {rwlock.RLock()fmt.Println("读取数据:", data)time.Sleep(100 * time.Millisecond)rwlock.RUnlock()
}func writeData() {rwlock.Lock()data++fmt.Println("写入数据:", data)rwlock.Unlock()
}func main() {var wg sync.WaitGroupfor i := 0; i < 5; i++ {wg.Add(1)go func() {defer wg.Done()readData()}()}for i := 0; i < 2; i++ {wg.Add(1)go func() {defer wg.Done()writeData()}()}wg.Wait()
}
在这个示例中,多个go routine可以同时读取data变量,但在写入data变量时需要获取写锁,确保数据的一致性。
参考资料
https://www.51cto.com/article/710755.html
菜鸟
等