Go Context和Channel
context、channel、select 原理
[TOC]
Context
一个接口,包含如下方法,主要用于实现主协程对子协程的控制,作用包括取消执行、设置超时时间、携带键值对等
type Context interface {
// 获取到期时间,如果没有,ok则返回false
Deadline() (deadline time.Time, ok bool)
// 返回一个chan,表示取消信号,如果通道关闭则代表该 Context 已经被取消;如果返回的为 nil,则代表该 Context 是一个永远不会被取消的 Context。
Done() <-chan struct{}
// 返回该 Context 被取消的原因。如果只使用 Context 包的 Context 类型的话,那么只可能返回 Canceled (代表被明确取消)或者 DeadlineExceeded (因超时而取消)
Err() error
// 获取Context中的键值对
Value(key interface{}) interface{}
}一个demo,引用:通知多个子goroutine退出运行
package main
import (
"context"
"crypto/md5"
"fmt"
"io/ioutil"
"net/http"
"sync"
"time"
)
type favContextKey string
func main() {
wg := &sync.WaitGroup{}
values := []string{"https://www.baidu.com/", "https://www.zhihu.com/"}
ctx, cancel := context.WithCancel(context.Background())
for _, url := range values {
wg.Add(1)
subCtx := context.WithValue(ctx, favContextKey("url"), url)
go reqURL(subCtx, wg)
}
go func() {
time.Sleep(time.Second * 3)
cancel()
}()
wg.Wait()
fmt.Println("exit main goroutine")
}
func reqURL(ctx context.Context, wg *sync.WaitGroup) {
defer wg.Done()
url, _ := ctx.Value(favContextKey("url")).(string)
for {
select {
// 调用Done方法检测是否有父节点调用cancel方法通知子节点退出运行, chan被close时触发
case <-ctx.Done():
fmt.Printf("stop getting url:%s\n", url)
return
default:
r, err := http.Get(url)
if r.StatusCode == http.StatusOK && err == nil {
body, _ := ioutil.ReadAll(r.Body)
subCtx := context.WithValue(ctx, favContextKey("resp"), fmt.Sprintf("%s%x", url, md5.Sum(body)))
wg.Add(1)
go showResp(subCtx, wg)
}
r.Body.Close()
//启动子goroutine是为了不阻塞当前goroutine,这里在实际场景中可以去执行其他逻辑,这里为了方便直接sleep一秒
// doSometing()
time.Sleep(time.Second * 1)
}
}
}
func showResp(ctx context.Context, wg *sync.WaitGroup) {
defer wg.Done()
for {
select {
case <-ctx.Done():
fmt.Println("stop showing resp")
return
default:
//子goroutine里一般会处理一些IO任务,如读写数据库或者rpc调用,这里为了方便直接把数据打印
fmt.Println("printing: ", ctx.Value(favContextKey("resp")))
time.Sleep(time.Second * 1)
}
}
}emptyCtx
go提供了两个基本的context创建,emptyCtx是int类型的重新定义,emptyCtx没有过期时间,不能被取消,不能设置value,作用仅作为context树的根节点。
cancelCtx
通过根context,比如emptyCtx之后,调用withCancel()方法可以创建cancelCtx用于取消操作。
有两种方式可触发取消:
返回的CancelFunc被调用,此时会取消当前context和其所有的子context
Done这个chan被close了,此时会取消当前context和其所有的子context
可被取消的context实现了canceler接口,具体实现:
timerCtx
可超时自动取消的context,内部使用cancelCtx + timer实现,调用WithDeadline()方法可以创建timerCtx用于超时取消操作。
WithTimeout()方法和WithDeadline()方法,效果是一样的,只是时间的含义不一样。
关于withTimeout()方法,返回的cancel函数,即使不主动调用,也不影响资源的最终释放,它到时间了也会自动调用,建议是提前主动调用,尽快释放,避免等待时间过长导致浪费。
valueCtx
valueCtx内部仍然使用Context存储父Context的指针,并用interface{}存储键值;
如果当前valueCtx找不到需要的key,会沿着树向上一直查找直到根节点,类似链表的搜索;
使用WithValue创建时,会判断key是否实现Comparable接口。如果没有实现,会触发panic;
key的类类型不应该是内置类型,以避免冲突。使用的时候应该自定义类型;
Channel
Channel的设计基于CSP模型。
CSP模型(Communicating Sequential Process,通信顺序进程),允许使用进程组来描述系统,独立运行,并且只通过消息传递的方式通信。
本质上就是,在使用协程执行函数时,不通过内存共享(会用到锁)的方式通信,而是通过Channel通信传递数据。
动画参考:https://go.xargin.com/docs/data_structure/channel/
基本
chan是引用类型,使用make关键字创建,未初始化时的零值是nil,如
ch := make(chan string, 10),创建一个能处理string的缓冲区大小为10的channel,效果相当于异步队列,除非缓冲区用完,否则不会阻塞;ch := make(chan string),则创建了一个不存在缓冲区的channel,效果相当于同步阻塞队列,len永远返回0。即 假如没有接收者,同一个方法内,连续发送两次数据,第一次如果没有被接收的话,此时就阻塞了,轮不到第二次发送,但如果size = 1,第一次发送的数据就会进入buf数组,不阻塞,到了第二次发送才阻塞;
var ch chan int表示创建了一个nil channel;channel作为通道,负责在多个goroutine间传递数据,解决多线程下共享数据竞争问题。
len()方法获取buff中未被读取的数量,即qcount的值;cap()方法获取buff数组的长度带有 <- 的chan是有方向的,不带 <- 的chan是双向的,比如
chan可以是任何类型的,比如可以是 chan<- 类型,<-总是尽量和左边的chan结合,比如
接收数据时可以有两个返回值,第一个是返回的元素,第二个是bool类型,表示是否成功地从chan中读取到一个值。如果是false,说明chan已经被close并且chan中没有缓存的数据,此时第一个元素是零值。所以,如果接收时第一个元素是零值,可能是sender真的发送了零值,也可能是closed并且没有元素导致的,所以最好通过第二个返回值来确定。
双向chan可以赋值给单向chan,但反过来不可以;
给一个nil channel发送数据,会造成永久阻塞,从一个nil channel接收数据,会造成永久阻塞;
给一个已经关闭的channel发送数据,会引起panic;
从一个已经关闭的channel接收数据,如果缓冲区为空,则返回一个零值;
已关闭的channel再次关闭,会panic;
对于一个不关闭的channel,在方法结束后,只要channel没有被引用,会被GC自动回收;
关闭channel的原则:不要向已关闭的channel发送数据或者再次关闭,关闭的动作尽量在sender做,主要还是分场景:
一个sender一个receiver的场景:在sender处关闭。
一个sender多个recevier的场景:在sender处关闭。
多个sender一个receiver的场景:增加一个传递关闭信号的 channel,receiver 通过信号 channel 下达关闭数据 channel 指令。senders 监听到关闭信号后,停止发送数据。
多个sender多个receiver的场景:再增加一个中间的channel,用来接收标识关闭的数据,收到后直接close传递关闭的信号channel即可。
channel在关闭时会自动退出循环;
注意channel不提供跨goroutine的数据保护,如果多个channel传递一份数据的指针,使得每个goroutine可以操作同一份数据,也会出现并发安全问题;
receive
block
block
read value
read value
返回未读的元素,读完后返回零值
send
block
write value
block
writed value
panic
close
panic
closed,没有未读元素
closed,保留未读元素
closed,保留未读元素
panic
数据结构
初始化
发送数据
使用ch <- "test"发送数据,最终会调用chansend()函数发送数据,该函数设置了阻塞参数为true;
如果chan是nil,则把发送者的goroutine park(阻塞休眠),此时发送者将被永久阻塞;
如果chan没有被close,但是chan满了,则直接返回false,但是由于阻塞参数为true,这部分不会被执行;
上锁,保证线程安全,再次检查chan是否被close,如果被close,再往里发数据会触发 解锁,panic;
同步发送 - 优先发送给等待接收的G
如果没被close,当recvq存在等待的接收者时,通过
send()函数,取出第一个等待的goroutine,直接发送数据,不需要先放到buf中;send()函数将因为等待数据的接收而阻塞的goroutine的状态从Gwaiting或者Gscanwaiting改为Grunnable,把goroutine绑定到P的LRQ中,等待下一轮调度时会立即执行这个等待发送数据的goroutine;异步发送 - 其次是发送到buf区
当recvq中没有等待的接收者,且buf区存在空余空间时,会使用
chanbuf()函数获取sendx索引值,计算出下一个可以存储数据的位置,然后调用typedmemmove()函数将要发送的数据拷贝到buff区,增加sendx索引和qcount计数器,完成之后解锁,返回成功;阻塞发送 - 最后才保存在待发送队列,阻塞(阻塞只发生在这里,此时G和M分离)
当recvq中没有等待的接收者,且buf区已满或不存在buf区时,会先调用
getg()函数获取正在发送者的goroutine,执行acquireSudog()函数获取sudoG对象,设置此次阻塞发送的相关信息(如发送的channel、是否在select控制结构中和待发送数据的内存地址、发送数据的goroutine)然后将该sudoG对象加入sendq队列,调用
goparkunlock()函数让当前发送者的goroutine进入等待状态,表示当前goroutine正在等待其他goroutine从channel中接收数据,等待调度器唤醒;此时len()返回值为0,数据的发送是阻塞在方法中的。
调度器唤醒后,将一些属性值设置为零,并释放sudog对象,表示向channel发送数据结束;
channel发送数据时涉及两次goroutine的调度:
当接收队列里存在sudoG可以直接发送数据时,执行
goready()函数,将G从Gwaiting或GScanwaiting转为Grunnable,等待下次调度触发,交由M执行;当没有等待接收数据的G,并且没有缓冲区,或者缓冲区已满时,执行
gopark()函数挂起当前G,将G阻塞,此时状态为Gwaiting,让出CPU等待调度器调度;
接收数据
使用 str <- ch 或 str, ok <- ch (ok用于判断ch是否关闭,如果没有ok,可能会无法分辨str接收到的零值是发送者发的还是ch关闭)接收数据,会转化为调用chanrecv1和chanrecv2函数,但最终会调用chanrecv函数接收数据。chanrecv1和chanrecv2函数都是设置阻塞参数为true。
如果chan是nil,则把接收者的goroutine park(阻塞休眠),接收者被永久阻塞;
不上锁检查 buf 区大小:如果chan的buf区大小为0 或者 没有数据可接收,检查是否被关闭,被关闭则返回;如果没被关闭,则再次检查buf区大小是否为0 或者 没有数据可接收,如果是,则清除ep指针中的数据并返回selected为true,received为false;
这里两次empty检查,因为第一次检查,chan可能还没关闭,但是第二次检查时关闭了,由于可能在两次检查之间有待接收的数据达到了,所以需要两次empty检查;
上锁检查buf区大小:上锁,如果chan已经被close,且buf区没有数据,清除ep指针中的数据,解锁,返回selected为true,received为false;
同步接收 - 如果无buf,消费发送等待队列中G的数据,如果buf满,先拿buf区的,发送的再加入
当chan的sendq队列存在等待状态的goroutine时(能拿到就说明要不就是buf区为0,要不就是buf区已满)
如果是无buf区的chan,直接使用
recv()函数从阻塞的发送者中获取数据;如果是有buf区的chan,说明此时buf区已满,则先从buf区中获取可接收的数据(从buf区中copy到接收者的内存),然后从sendq队列的队首中读取待发送的数据,加入到buf区中(将发送者的数据copy到buf区,替换刚刚buf区copy出去的位置),更新可接收和可发送的下标chan.recvx和sendx的值;
最后调用
goready()函数将等待发送数据而阻塞gorouotine的状态从Gwaiting 或者 Gscanwaiting 改变成 Grunnable,把goroutine绑定到P的LRQ中,等待下一轮调度时立即释放这个等待发送数据的goroutine;异步接收 - 其次是消费buf区中的数据
当channel的sendq队列没有等待状态的goroutine,且buf区存在数据时,从channel的buf区中的recvx的索引位置接收数据,如果接收数据的内存地址不为空,会直接将缓冲区里的数据拷贝到内存中,清除buf区中的数据,递增recvx,递减qcount,完成数据接收;
这个和chansend共用一把锁,所以不会有并发问题;
阻塞接收 - 最后才是保存在接收等待队列,阻塞(阻塞只发生在这里,此时G和M分离)
当channel的sendq队列没有等待状态的goroutine,且buf区不存在数据时,执行
acquireSudog()函数获取sudoG对象,设置此次阻塞发送的相关信息(如发送的channel、是否在select控制结构中和待发送数据的内存地址、发送数据的goroutine)然后将该sudoG对象加入待发送recvq队列,调用
goparkunlock()函数让当前接收者的goroutine进入等待状态,表示当前goroutine正在等待其他goroutine从channel中发送数据,等待调度器唤醒;此时方法会阻塞在ch的接收中,len()返回值为0;
goroutine被唤醒后,chan完成阻塞数据的接收,接收完成后进行基本的参数检查,解除chan的绑定,释放sudoG,表示接收数据完成;
channel 接收过程中包含 2 次有关 goroutine 调度过程:
当发送队列中存在 sudoG 时,调用
goready(),G 的状态从 Gwaiting 或者 Gscanwaiting 改变成 Grunnable,等待下次调度便立即运行;当 buf 区为空,且没有发送者时,调用
gopark()挂起当前G,此时状态为Gwaiting,让出 cpu 的使用权并等待调度器的调度;
关闭
如果 chan 为 nil,close 会 panic;
上锁:
如果 chan 已经 closed,再次 close 也会 panic;
否则的话,如果 chan 不为 nil,chan 也没有 closed,设置chan的标记为closed;
优先释放所有的接收者:
将接收者等待队列中的sudoG对象加入到待清除队列glist中,这里会优先回收接收者,这样即使从close中的chan读取数据,也不会panic,最多读到默认值;
这样第6步执行的时候,才会先执行接收者,接收后面发送者的数据(接收buff数组里的数据,因为sender里的会被panic掉),否则发送者发送的数据无法被先接收。
其次是释放所有发送者: 将发送者等待队列中的sudoG对象加入到待清除队列glist中,这里可能会发生panic,因为往一个close的chan中发送数据会panic;
解锁
进行最后的调度,遍历glist中的sudoG,调用
goready()触发调度,将每个goroutine状态从 Gwaiting 转为 Grunnable状态,等待调度器调度;
应用场景
实现生产者 - 消费组模型,数据传递,比如worker池的实现
信号通知:利用 如果chan为空,那receiver接收数据的时候就会阻塞等待,直到chan被关闭或有新数据进来 的特点,将一个协程将信号(closing、closed、data ready等)传递给另一个或者另一组协程,比如 wait/notify的模式。
协程池,把要操作的逻辑封装成task,通过chan传输实现协程复用
任务编排:让一组协程按照一定的顺序并发或串行执行,比如实现waitGroup的功能
控制并发量,可以配合WaitGroup进行控制goroutine的数量
任务定时
实现互斥锁的机制,比如,容量为 1 的chan,放入chan的元素代表锁,谁先取得这个元素,就代表谁先获取了锁
共享资源的并发访问使用传统并发原语;
复杂的任务编排和消息传递使用 Channel;
消息通知机制使用 Channel,除非只想 signal 一个 goroutine,才使用 Cond;
简单等待所有任务的完成用 WaitGroup,也有 Channel 的推崇者用 Channel,都可以;
需要和 Select 语句结合,使用 Channel;需要和超时配合时,使用 Channel 和 Context。
注意点:使用chan要注意panic和goroutine泄露,另外,只要一个 chan 还有未读的数据,即使把它 close 掉,你还是可以继续把这些未读的数据消费完,之后才是读取零值数据。
在使用chan和select配合时要注意会出现goroutine泄漏的情况:
select
结构
select在runtime中不存在结构体表示,但是case倒是有
基本
非阻塞收发:当chan中存在可接收数据,直接处理那个chan,否则执行default语句
随机执行:select遇到多个case就绪,会进行随机选择
编译器会对select中的case进行优化,总共有四种情况:
不包含任何case,即空select,此时会阻塞当前的goroutine
只包含一个case,此时select会被优化成 if ,当chan没有数据可接收时,就会把阻塞当前goroutine,直到有数据到来;如果chan是nil,就会永远阻塞当前goroutine
存在两个case,其中一个是default:
发送,这种情况下,发送是不阻塞的:
接收,这种情况下,chan有值就走case,否则走default
通用的select条件:比如select里包含多个case,会编译成通过
runtime的selectgo方法处理case,selectgo会返回 case的序号 还有 是否被接收的标识,然后被编译成多个if,用于判断选中哪个case。
selectgo的流程
获取case数组,随机打乱,确定打乱后的轮询顺序数组pollorder和加锁顺序数组lockorder,数组里存放的元素是chan
按加锁顺序数组,调用chan的锁,依次进行锁定
进入主循环,遍历 轮询顺序数组pollorder
第一阶段,查找是否已经存在准备就绪的chan(此时的chan可以执行收发操作)此时需要处理四种类型的case:
当case不包含chan时,直接跳过;
当case会从chan中接收数据时:
如果当前chan的
sendq队列上有等待的goroutine,就会跳到recv标签,如果没有buf区,则从sendq队列上获取数据,否则,从chan的buf区读取数据后,将sendq队列中等待的goroutine中的数据放入到buf区中相同的位置;如果当前chan的buf区不为空,就跳到
bufrecv标签,从chan的buf区中获取数据如果当前chan已经被关闭,就会跳到
rclose标签做一些清除的收尾工作;
当case会从chan中发送数据时:
如果当前chan已经被关闭,会直接跳到
sclose标签,触发panic;如果当前chan的
recvq队列上有等待的goroutine,就跳到send标签向chan发送数据;如果当前chan的缓冲区存在空闲位置,就会将等待发送的数据存入缓冲区中,因为select相当于有接收者了,不会出现发送阻塞的情况;
当case是default时,表示前面的所有case都没有被执行,此时会解锁所有的chan并返回(意味着当前select结构的收发都是非阻塞的),直接执行default内容;
第一阶段只是查找所有case中是否有可以立即被处理的chan,无论是数据是在等待的goroutine上,还是buf区中,只要存在数据满足条件就会立即处理,然后返回;如果不能立刻找到活跃的chan,就会进入下一循环;
第二阶段,将当前goroutine加入到chan对应的收发队列上并等待其他goroutine的唤醒:
将当前goroutine,包装成sudogo,遍历case,加入到case的chan的
sendq队列或者recvq队列中(同时,这个sudog会关联当前case的chan,然后将这些sudog组成链表,挂在当前goroutine下,用于唤醒之后的查找)
调用
gopark函数挂起当前goroutine,等待被调度器唤醒;
第三阶段,当前goroutine被唤醒后,找到满足条件的chan并进行处理:
等到select对应的chan准备好后,当前goroutine会被调度器唤醒,被唤醒后,获取当前goroutine的sudog,依次对比所有case里chan对应的sudog结构,找到被唤醒的case,并释放其他未被使用的sudog结构;
由于当前的select结构已经被挑选了其中一个case执行,剩下的case中没有被用到的sudog会被直接忽略并释放掉,为了不影响chan的正常使用,还需要将这些废弃的sudog从chan中出队;
Timer
https://www.cyhone.com/articles/analysis-of-golang-timer/
时间轮
概念理解:一张图理解Kafka时间轮
手把手教你如何用golang实现一个timewheel时间轮

总结:
通过DelayQueue(优先级队列实现,队列里的每个元素,都是某一个具体时间的list) + 环形数组(数组的每个元素是个list,索引代表时间格)
DelayQueue会根据环形数组中的每个元素进行排序;
添加任务时,判断任务执行时间,加入环形数组中,对应的环形数组的元素(list),加入DelayQueue中。,
Last updated