copy Copy chevron-down
Go Context和Channel context、channel、select 原理
[TOC]
一个接口,包含如下方法,主要用于实现主协程对子协程的控制,作用包括取消执行、设置超时时间、携带键值对等
Copy type Context interface {
// 获取到期时间,如果没有,ok则返回false
Deadline () ( deadline time . Time , ok bool )
// 返回一个chan,表示取消信号,如果通道关闭则代表该 Context 已经被取消;如果返回的为 nil,则代表该 Context 是一个永远不会被取消的 Context。
Done () <- chan struct {}
// 返回该 Context 被取消的原因。如果只使用 Context 包的 Context 类型的话,那么只可能返回 Canceled (代表被明确取消)或者 DeadlineExceeded (因超时而取消)
Err () error
// 获取Context中的键值对
Value ( key interface {}) interface {}
} 一个demo,引用:通知多个子goroutine退出运行arrow-up-right
Copy package main
import (
" context "
" crypto/md5 "
" fmt "
" io/ioutil "
" net/http "
" sync "
" time "
)
type favContextKey string
func main () {
wg := & sync . WaitGroup {}
values := [] string { " https://www.baidu.com/ " , " https://www.zhihu.com/ " }
ctx , cancel := context . WithCancel ( context . Background ())
for _ , url := range values {
wg . Add ( 1 )
subCtx := context . WithValue ( ctx , favContextKey ( " url " ), url )
go reqURL ( subCtx , wg )
}
go func () {
time . Sleep ( time . Second * 3 )
cancel ()
}()
wg . Wait ()
fmt . Println ( " exit main goroutine " )
}
func reqURL ( ctx context . Context , wg * sync . WaitGroup ) {
defer wg . Done ()
url , _ := ctx . Value ( favContextKey ( " url " )).( string )
for {
select {
// 调用Done方法检测是否有父节点调用cancel方法通知子节点退出运行, chan被close时触发
case <- ctx . Done ():
fmt . Printf ( " stop getting url: %s \n " , url )
return
default :
r , err := http . Get ( url )
if r . StatusCode == http . StatusOK && err == nil {
body , _ := ioutil . ReadAll ( r . Body )
subCtx := context . WithValue ( ctx , favContextKey ( " resp " ), fmt . Sprintf ( " %s%x " , url , md5 . Sum ( body )))
wg . Add ( 1 )
go showResp ( subCtx , wg )
}
r . Body . Close ()
// 启动子goroutine是为了不阻塞当前goroutine,这里在实际场景中可以去执行其他逻辑,这里为了方便直接sleep一秒
// doSometing()
time . Sleep ( time . Second * 1 )
}
}
}
func showResp ( ctx context . Context , wg * sync . WaitGroup ) {
defer wg . Done ()
for {
select {
case <- ctx . Done ():
fmt . Println ( " stop showing resp " )
return
default :
// 子goroutine里一般会处理一些IO任务,如读写数据库或者rpc调用,这里为了方便直接把数据打印
fmt . Println ( " printing: " , ctx . Value ( favContextKey ( " resp " )))
time . Sleep ( time . Second * 1 )
}
}
} go提供了两个基本的context创建,emptyCtx是int类型的重新定义,emptyCtx没有过期时间,不能被取消,不能设置value,作用仅作为context树的根节点。
通过根context,比如emptyCtx之后,调用withCancel()方法可以创建cancelCtx用于取消操作。
有两种方式可触发取消:
返回的CancelFunc被调用,此时会取消当前context和其所有的子context
Done这个chan被close了,此时会取消当前context和其所有的子context
可被取消的context实现了canceler接口,具体实现:
可超时自动取消的context,内部使用cancelCtx + timer实现,调用WithDeadline()方法可以创建timerCtx用于超时取消操作。
WithTimeout()方法和WithDeadline()方法,效果是一样的,只是时间的含义不一样。
关于withTimeout()方法,返回的cancel函数,即使不主动调用,也不影响资源的最终释放,它到时间了也会自动调用,建议是提前主动调用,尽快释放,避免等待时间过长导致浪费。
valueCtx内部仍然使用Context存储父Context的指针,并用interface{}存储键值;
如果当前valueCtx找不到需要的key,会沿着树向上一直查找直到根节点,类似链表的搜索;
使用WithValue创建时,会判断key是否实现Comparable接口。如果没有实现,会触发panic;
key的类类型不应该是内置类型,以避免冲突。使用的时候应该自定义类型;
Channel的设计基于CSP模型。
CSP模型(Communicating Sequential Process,通信顺序进程),允许使用进程组来描述系统,独立运行,并且只通过消息传递的方式通信。
本质上就是,在使用协程执行函数时,不通过内存共享(会用到锁)的方式通信,而是通过Channel通信传递数据。
动画参考:https://go.xargin.com/docs/data_structure/channel/
chan是引用类型,使用make关键字创建,未初始化时的零值是nil,如
ch := make(chan string, 10),创建一个能处理string的缓冲区大小为10的channel,效果相当于异步队列,除非缓冲区用完,否则不会阻塞;
ch := make(chan string),则创建了一个不存在缓冲区的channel,效果相当于同步阻塞队列,len永远返回0。
即 假如没有接收者,同一个方法内,连续发送两次数据,第一次如果没有被接收的话,此时就阻塞了,轮不到第二次发送,但如果size = 1,第一次发送的数据就会进入buf数组,不阻塞,到了第二次发送才阻塞;
var ch chan int表示创建了一个nil channel;
channel作为通道,负责在多个goroutine间传递数据,解决多线程下共享数据竞争问题。
len()方法获取buff中未被读取的数量,即qcount的值;
cap()方法获取buff数组的长度
带有 <- 的chan是有方向的,不带 <- 的chan是双向的,比如
chan可以是任何类型的,比如可以是 chan<- 类型,<-总是尽量和左边的chan结合,比如
接收数据时可以有两个返回值,第一个是返回的元素,第二个是bool类型,表示是否成功地从chan中读取到一个值。如果是false,说明chan已经被close并且chan中没有缓存的数据,此时第一个元素是零值。所以,如果接收时第一个元素是零值,可能是sender真的发送了零值,也可能是closed并且没有元素导致的,所以最好通过第二个返回值来确定。
双向chan可以赋值给单向chan,但反过来不可以;
给一个nil channel发送数据,会造成永久阻塞,从一个nil channel接收数据,会造成永久阻塞;
给一个已经关闭的channel发送数据,会引起panic;
从一个已经关闭的channel接收数据,如果缓冲区为空,则返回一个零值;
对于一个不关闭的channel,在方法结束后,只要channel没有被引用,会被GC自动回收;
关闭channel的原则:不要向已关闭的channel发送数据或者再次关闭,关闭的动作尽量在sender做,主要还是分场景:
一个sender一个receiver的场景:在sender处关闭。
一个sender多个recevier的场景:在sender处关闭。
多个sender一个receiver的场景:增加一个传递关闭信号的 channel,receiver 通过信号 channel 下达关闭数据 channel 指令。senders 监听到关闭信号后,停止发送数据。
多个sender多个receiver的场景:再增加一个中间的channel,用来接收标识关闭的数据,收到后直接close传递关闭的信号channel即可。
注意channel不提供跨goroutine的数据保护,如果多个channel传递一份数据的指针,使得每个goroutine可以操作同一份数据,也会出现并发安全问题;
nil
empty
full
not full & empty
closed
使用ch <- "test"发送数据,最终会调用chansend()函数发送数据,该函数设置了阻塞参数为true;
如果chan是nil,则把发送者的goroutine park(阻塞休眠),此时发送者将被永久阻塞;
如果chan没有被close,但是chan满了,则直接返回false,但是由于阻塞参数为true,这部分不会被执行;
上锁 ,保证线程安全,再次检查chan是否被close,如果被close,再往里发数据会触发 解锁,panic ;
同步发送 - 优先发送给等待接收的G
如果没被close,当recvq存在等待的接收者时 ,通过send()函数,取出第一个等待的goroutine,直接发送数据,不需要先放到buf中;
send()函数将因为等待数据的接收而阻塞的goroutine的状态从Gwaiting或者Gscanwaiting改为Grunnable,把goroutine绑定到P的LRQ中,等待下一轮调度 时会立即执行这个等待发送数据的goroutine;
异步发送 - 其次是发送到buf区
当recvq中没有等待的接收者,且buf区存在空余空间时 ,会使用chanbuf()函数获取sendx索引值 ,计算出下一个可以存储数据的位置,然后调用typedmemmove()函数将要发送的数据拷贝到buff区,增加sendx索引和qcount计数器,完成之后解锁,返回成功;
阻塞发送 - 最后才保存在待发送队列,阻塞(阻塞只发生在这里,此时G和M分离)
当recvq中没有等待的接收者,且buf区已满或不存在buf区时 ,会先调用getg()函数获取正在发送者的goroutine,执行acquireSudog()函数获取sudoG对象,设置此次阻塞发送的相关信息(如发送的channel、是否在select控制结构中和待发送数据的内存地址、发送数据的goroutine)
然后将该sudoG对象加入sendq队列,调用goparkunlock()函数让当前发送者的goroutine进入等待状态,表示当前goroutine正在等待其他goroutine从channel中接收数据,等待调度器唤醒;
此时len()返回值为0,数据的发送是阻塞在方法中的。
调度器唤醒后,将一些属性值设置为零,并释放sudog对象,表示向channel发送数据结束;
channel发送数据时涉及两次goroutine的调度 :
当接收队列里存在sudoG可以直接发送数据时,执行goready()函数,将G从Gwaiting或GScanwaiting转为Grunnable,等待下次调度触发,交由M执行;
当没有等待接收数据的G,并且没有缓冲区,或者缓冲区已满时,执行gopark()函数挂起当前G,将G阻塞,此时状态为Gwaiting,让出CPU等待调度器调度;
使用 str <- ch 或 str, ok <- ch (ok用于判断ch是否关闭,如果没有ok,可能会无法分辨str接收到的零值是发送者发的还是ch关闭)接收数据,会转化为调用chanrecv1和chanrecv2函数,但最终会调用chanrecv函数接收数据。chanrecv1和chanrecv2函数都是设置阻塞参数为true。
如果chan是nil,则把接收者的goroutine park(阻塞休眠),接收者被永久阻塞;
不上锁检查 buf 区大小:如果chan的buf区大小为0 或者 没有数据可接收,检查是否被关闭,被关闭则返回;如果没被关闭,则再次检查buf区大小是否为0 或者 没有数据可接收,如果是,则清除ep指针中的数据并返回selected为true,received为false;
这里两次empty检查,因为第一次检查,chan可能还没关闭,但是第二次检查时关闭了,由于可能在两次检查之间有待接收的数据达到了,所以需要两次empty检查;
上锁 检查buf区大小:上锁,如果chan已经被close,且buf区没有数据,清除ep指针中的数据,解锁,返回selected为true,received为false;
同步接收 - 如果无buf,消费发送等待队列中G的数据,如果buf满,先拿buf区的,发送的再加入
当chan的sendq队列存在等待状态的goroutine时 (能拿到就说明要不就是buf区为0,要不就是buf区已满)
如果是无buf区的chan ,直接使用recv()函数从阻塞的发送者中获取数据;
如果是有buf区的chan ,说明此时buf区已满,则先从buf区中获取可接收的数据(从buf区中copy到接收者的内存),然后从sendq队列的队首中读取待发送的数据,加入到buf区中(将发送者的数据copy到buf区,替换刚刚buf区copy出去的位置),更新可接收和可发送的下标chan.recvx和sendx的值;
最后调用goready()函数将等待发送数据而阻塞gorouotine的状态从Gwaiting 或者 Gscanwaiting 改变成 Grunnable,把goroutine绑定到P的LRQ中,等待下一轮调度时 立即释放这个等待发送数据的goroutine;
异步接收 - 其次是消费buf区中的数据
当channel的sendq队列没有等待状态的goroutine,且buf区存在数据时 ,从channel的buf区中的recvx的索引位置接收数据,如果接收数据的内存地址不为空,会直接将缓冲区里的数据拷贝到内存中,清除buf区中的数据,递增recvx,递减qcount,完成数据接收;
这个和chansend共用一把锁,所以不会有并发问题;
阻塞接收 - 最后才是保存在接收等待队列,阻塞(阻塞只发生在这里,此时G和M分离)
当channel的sendq队列没有等待状态的goroutine,且buf区不存在数据时 ,执行acquireSudog()函数获取sudoG对象,设置此次阻塞发送的相关信息(如发送的channel、是否在select控制结构中和待发送数据的内存地址、发送数据的goroutine)
然后将该sudoG对象加入待发送recvq队列,调用goparkunlock()函数让当前接收者的goroutine进入等待状态,表示当前goroutine正在等待其他goroutine从channel中发送数据,等待调度器唤醒;
此时方法会阻塞在ch的接收中,len()返回值为0;
goroutine被唤醒后,chan完成阻塞数据的接收,接收完成后进行基本的参数检查,解除chan的绑定,释放sudoG,表示接收数据完成;
channel 接收过程中包含 2 次有关 goroutine 调度过程 :
当发送队列中存在 sudoG 时,调用goready(),G 的状态从 Gwaiting 或者 Gscanwaiting 改变成 Grunnable,等待下次调度便立即运行;
当 buf 区为空,且没有发送者时,调用 gopark()挂起当前G,此时状态为Gwaiting,让出 cpu 的使用权并等待调度器的调度;
如果 chan 为 nil,close 会 panic;
上锁:
如果 chan 已经 closed,再次 close 也会 panic;
否则的话,如果 chan 不为 nil,chan 也没有 closed,设置chan的标记为closed;
优先释放所有的接收者 :
将接收者等待队列中的sudoG对象加入到待清除队列glist中,这里会优先回收接收者,这样即使从close中的chan读取数据,也不会panic,最多读到默认值;
这样第6步执行的时候,才会先执行接收者,接收后面发送者的数据(接收buff数组里的数据,因为sender里的会被panic掉),否则发送者发送的数据无法被先接收。
其次是释放所有发送者 : 将发送者等待队列中的sudoG对象加入到待清除队列glist中,这里可能会发生panic,因为往一个close的chan中发送数据会panic;
进行最后的调度,遍历glist中的sudoG,调用goready()触发调度,将每个goroutine状态从 Gwaiting 转为 Grunnable状态,等待调度器调度;
信号通知:利用 如果chan为空,那receiver接收数据的时候就会阻塞等待,直到chan被关闭或有新数据进来 的特点,将一个协程将信号(closing、closed、data ready等)传递给另一个或者另一组协程,比如 wait/notify的模式。
协程池,把要操作的逻辑封装成task,通过chan传输实现协程复用
任务编排:让一组协程按照一定的顺序并发或串行执行,比如实现waitGroup的功能
控制并发量,可以配合WaitGroup进行控制goroutine的数量
实现互斥锁的机制,比如,容量为 1 的chan,放入chan的元素代表锁,谁先取得这个元素,就代表谁先获取了锁
共享资源的并发访问使用传统并发原语;
复杂的任务编排和消息传递使用 Channel;
消息通知机制使用 Channel,除非只想 signal 一个 goroutine,才使用 Cond;
简单等待所有任务的完成用 WaitGroup,也有 Channel 的推崇者用 Channel,都可以;
需要和 Select 语句结合,使用 Channel;需要和超时配合时,使用 Channel 和 Context。
注意点:使用chan要注意panic和goroutine泄露,另外,只要一个 chan 还有未读的数据,即使把它 close 掉,你还是可以继续把这些未读的数据消费完,之后才是读取零值数据。
在使用chan和select配合时要注意会出现goroutine泄漏的情况:
select在runtime中不存在结构体表示,但是case倒是有
非阻塞收发:当chan中存在可接收数据,直接处理那个chan,否则执行default语句
随机执行:select遇到多个case就绪,会进行随机选择
编译器会对select中的case进行优化,总共有四种情况:
不包含任何case,即空select,此时会阻塞当前的goroutine
只包含一个case,此时select会被优化成 if ,当chan没有数据可接收时,就会把阻塞当前goroutine,直到有数据到来;如果chan是nil,就会永远阻塞当前goroutine
存在两个case,其中一个是default:
接收,这种情况下,chan有值就走case,否则走default
通用的select条件:比如select里包含多个case,会编译成通过runtime的selectgo方法处理case,selectgo会返回 case的序号 还有 是否被接收的标识,然后被编译成多个if,用于判断选中哪个case。
获取case数组,随机打乱,确定打乱后的轮询顺序数组pollorder和加锁顺序数组lockorder,数组里存放的元素是chan
第一阶段 ,查找是否已经存在准备就绪的chan(此时的chan可以执行收发操作)此时需要处理四种类型的case:
如果当前chan的sendq队列上有等待的goroutine,就会跳到 recv标签,如果没有buf区,则从sendq队列上获取数据,否则,从chan的buf区读取数据后,将sendq队列中等待的goroutine中的数据放入到buf区中相同的位置;
如果当前chan的buf区不为空,就跳到bufrecv标签,从chan的buf区中获取数据
如果当前chan已经被关闭,就会跳到 rclose标签 做一些清除的收尾工作;
如果当前chan已经被关闭,会直接跳到 sclose标签,触发panic;
如果当前chan的recvq队列上有等待的goroutine,就跳到 send标签 向chan发送数据;
如果当前chan的缓冲区存在空闲位置,就会将等待发送的数据存入缓冲区中,因为select相当于有接收者了,不会出现发送阻塞的情况;
当case是default时,表示前面的所有case都没有被执行,此时会解锁所有的chan并返回(意味着当前select结构的收发都是非阻塞的),直接执行default内容;
第一阶段只是查找所有case中是否有可以立即被处理的chan,无论是数据是在等待的goroutine上,还是buf区中,只要存在数据满足条件就会立即处理,然后返回;如果不能立刻找到活跃的chan,就会进入下一循环;
第二阶段 ,将当前goroutine加入到chan对应的收发队列上并等待其他goroutine的唤醒:
将当前goroutine,包装成sudogo,遍历case,加入到case的chan的sendq队列或者recvq队列中(同时,这个sudog会关联当前case的chan,然后将这些sudog组成链表,挂在当前goroutine下,用于唤醒之后的查找)
调用gopark函数挂起当前goroutine,等待被调度器唤醒;
第三阶段 ,当前goroutine被唤醒后,找到满足条件的chan并进行处理:
等到select对应的chan准备好后,当前goroutine会被调度器唤醒,被唤醒后,获取当前goroutine的sudog,依次对比所有case里chan对应的sudog结构,找到被唤醒的case,并释放其他未被使用的sudog结构;
由于当前的select结构已经被挑选了其中一个case执行,剩下的case中没有被用到的sudog会被直接忽略并释放掉,为了不影响chan的正常使用,还需要将这些废弃的sudog从chan中出队;
https://www.cyhone.com/articles/analysis-of-golang-timer/
概念理解:一张图理解Kafka时间轮arrow-up-right
手把手教你如何用golang实现一个timewheel时间轮arrow-up-right
Go语言中时间轮的实现arrow-up-right
总结:
通过DelayQueue(优先级队列实现,队列里的每个元素,都是某一个具体时间的list) + 环形数组(数组的每个元素是个list,索引代表时间格)
DelayQueue会根据环形数组中的每个元素进行排序;
添加任务时,判断任务执行时间,加入环形数组中,对应的环形数组的元素(list),加入DelayQueue中。,