Go Context和Channel

context、channel、select 原理

[TOC]

Context

一个接口,包含如下方法,主要用于实现主协程对子协程的控制,作用包括取消执行、设置超时时间、携带键值对等

type Context interface {
    // 获取到期时间,如果没有,ok则返回false
	Deadline() (deadline time.Time, ok bool)
	// 返回一个chan,表示取消信号,如果通道关闭则代表该 Context 已经被取消;如果返回的为 nil,则代表该 Context 是一个永远不会被取消的 Context。
    Done() <-chan struct{}
	// 返回该 Context 被取消的原因。如果只使用 Context 包的 Context 类型的话,那么只可能返回 Canceled (代表被明确取消)或者 DeadlineExceeded (因超时而取消)
    Err() error
    // 获取Context中的键值对
	Value(key interface{}) interface{}
}

一个demo,引用:通知多个子goroutine退出运行

 package main
 
 import (
 	"context"
 	"crypto/md5"
 	"fmt"
 	"io/ioutil"
 	"net/http"
 	"sync"
	"time"
)

type favContextKey string

func main() {
	wg := &sync.WaitGroup{}
	values := []string{"https://www.baidu.com/", "https://www.zhihu.com/"}
	ctx, cancel := context.WithCancel(context.Background())

	for _, url := range values {
		wg.Add(1)
		subCtx := context.WithValue(ctx, favContextKey("url"), url)
		go reqURL(subCtx, wg)
	}

	go func() {
		time.Sleep(time.Second * 3)
		cancel()
	}()

	wg.Wait()
	fmt.Println("exit main goroutine")
}

func reqURL(ctx context.Context, wg *sync.WaitGroup) {
	defer wg.Done()
	url, _ := ctx.Value(favContextKey("url")).(string)
	for {
		select {
        // 调用Done方法检测是否有父节点调用cancel方法通知子节点退出运行, chan被close时触发
		case <-ctx.Done():
			fmt.Printf("stop getting url:%s\n", url)
			return
		default:
			r, err := http.Get(url)
			if r.StatusCode == http.StatusOK && err == nil {
				body, _ := ioutil.ReadAll(r.Body)
				subCtx := context.WithValue(ctx, favContextKey("resp"), fmt.Sprintf("%s%x", url, md5.Sum(body)))
				wg.Add(1)
				go showResp(subCtx, wg)
			}
			r.Body.Close()
			//启动子goroutine是为了不阻塞当前goroutine,这里在实际场景中可以去执行其他逻辑,这里为了方便直接sleep一秒
			// doSometing()
			time.Sleep(time.Second * 1)
		}
	}
}

func showResp(ctx context.Context, wg *sync.WaitGroup) {
	defer wg.Done()
	for {
		select {
		case <-ctx.Done():
			fmt.Println("stop showing resp")
			return
		default:
			//子goroutine里一般会处理一些IO任务,如读写数据库或者rpc调用,这里为了方便直接把数据打印
			fmt.Println("printing: ", ctx.Value(favContextKey("resp")))
			time.Sleep(time.Second * 1)
		}
	}
}

emptyCtx

go提供了两个基本的context创建,emptyCtx是int类型的重新定义,emptyCtx没有过期时间,不能被取消,不能设置value,作用仅作为context树的根节点。

cancelCtx

通过根context,比如emptyCtx之后,调用withCancel()方法可以创建cancelCtx用于取消操作。

有两种方式可触发取消:

  1. 返回的CancelFunc被调用,此时会取消当前context和其所有的子context

  2. Done这个chan被close了,此时会取消当前context和其所有的子context

可被取消的context实现了canceler接口,具体实现:

timerCtx

可超时自动取消的context,内部使用cancelCtx + timer实现,调用WithDeadline()方法可以创建timerCtx用于超时取消操作。

WithTimeout()方法WithDeadline()方法,效果是一样的,只是时间的含义不一样。

关于withTimeout()方法,返回的cancel函数,即使不主动调用,也不影响资源的最终释放,它到时间了也会自动调用,建议是提前主动调用,尽快释放,避免等待时间过长导致浪费。

valueCtx

valueCtx内部仍然使用Context存储父Context的指针,并用interface{}存储键值;

如果当前valueCtx找不到需要的key,会沿着树向上一直查找直到根节点,类似链表的搜索;

使用WithValue创建时,会判断key是否实现Comparable接口。如果没有实现,会触发panic

key的类类型不应该是内置类型,以避免冲突。使用的时候应该自定义类型;

Channel

Channel的设计基于CSP模型。

CSP模型(Communicating Sequential Process,通信顺序进程),允许使用进程组来描述系统,独立运行,并且只通过消息传递的方式通信。

本质上就是,在使用协程执行函数时,不通过内存共享(会用到锁)的方式通信,而是通过Channel通信传递数据。

动画参考:https://go.xargin.com/docs/data_structure/channel/

基本

  • chan是引用类型,使用make关键字创建,未初始化时的零值是nil,如

    ch := make(chan string, 10),创建一个能处理string的缓冲区大小为10的channel,效果相当于异步队列,除非缓冲区用完,否则不会阻塞;

    ch := make(chan string),则创建了一个不存在缓冲区的channel,效果相当于同步阻塞队列,len永远返回0。

    即 假如没有接收者,同一个方法内,连续发送两次数据,第一次如果没有被接收的话,此时就阻塞了,轮不到第二次发送,但如果size = 1,第一次发送的数据就会进入buf数组,不阻塞,到了第二次发送才阻塞;

    var ch chan int表示创建了一个nil channel;

  • channel作为通道,负责在多个goroutine间传递数据,解决多线程下共享数据竞争问题。

  • len()方法获取buff中未被读取的数量,即qcount的值;

    cap()方法获取buff数组的长度

  • 带有 <- 的chan是有方向的,不带 <- 的chan是双向的,比如

  • chan可以是任何类型的,比如可以是 chan<- 类型,<-总是尽量和左边的chan结合,比如

  • 接收数据时可以有两个返回值,第一个是返回的元素,第二个是bool类型,表示是否成功地从chan中读取到一个值。如果是false,说明chan已经被close并且chan中没有缓存的数据,此时第一个元素是零值。所以,如果接收时第一个元素是零值,可能是sender真的发送了零值,也可能是closed并且没有元素导致的,所以最好通过第二个返回值来确定。

  • 双向chan可以赋值给单向chan,但反过来不可以;

  • 给一个nil channel发送数据,会造成永久阻塞,从一个nil channel接收数据,会造成永久阻塞;

  • 给一个已经关闭的channel发送数据,会引起panic;

  • 从一个已经关闭的channel接收数据,如果缓冲区为空,则返回一个零值;

  • 已关闭的channel再次关闭,会panic;

  • 对于一个不关闭的channel,在方法结束后,只要channel没有被引用,会被GC自动回收;

  • 关闭channel的原则:不要向已关闭的channel发送数据或者再次关闭,关闭的动作尽量在sender做,主要还是分场景:

    • 一个sender一个receiver的场景:在sender处关闭。

    • 一个sender多个recevier的场景:在sender处关闭。

    • 多个sender一个receiver的场景:增加一个传递关闭信号的 channel,receiver 通过信号 channel 下达关闭数据 channel 指令。senders 监听到关闭信号后,停止发送数据。

    • 多个sender多个receiver的场景:再增加一个中间的channel,用来接收标识关闭的数据,收到后直接close传递关闭的信号channel即可。

  • channel在关闭时会自动退出循环;

  • 注意channel不提供跨goroutine的数据保护,如果多个channel传递一份数据的指针,使得每个goroutine可以操作同一份数据,也会出现并发安全问题;

nil
empty
full
not full & empty
closed

receive

block

block

read value

read value

返回未读的元素,读完后返回零值

send

block

write value

block

writed value

panic

close

panic

closed,没有未读元素

closed,保留未读元素

closed,保留未读元素

panic

数据结构

初始化

发送数据

使用ch <- "test"发送数据,最终会调用chansend()函数发送数据,该函数设置了阻塞参数为true;

  1. 如果chan是nil,则把发送者的goroutine park(阻塞休眠),此时发送者将被永久阻塞;

  2. 如果chan没有被close,但是chan满了,则直接返回false,但是由于阻塞参数为true,这部分不会被执行;

  3. 上锁,保证线程安全,再次检查chan是否被close,如果被close,再往里发数据会触发 解锁,panic

  4. 同步发送 - 优先发送给等待接收的G

    如果没被close,当recvq存在等待的接收者时,通过send()函数,取出第一个等待的goroutine,直接发送数据,不需要先放到buf中;

    send()函数将因为等待数据的接收而阻塞的goroutine的状态从Gwaiting或者Gscanwaiting改为Grunnable,把goroutine绑定到P的LRQ中,等待下一轮调度时会立即执行这个等待发送数据的goroutine;

  5. 异步发送 - 其次是发送到buf区

    当recvq中没有等待的接收者,且buf区存在空余空间时,会使用chanbuf()函数获取sendx索引值,计算出下一个可以存储数据的位置,然后调用typedmemmove()函数将要发送的数据拷贝到buff区,增加sendx索引和qcount计数器,完成之后解锁,返回成功;

  6. 阻塞发送 - 最后才保存在待发送队列,阻塞(阻塞只发生在这里,此时G和M分离)

    当recvq中没有等待的接收者,且buf区已满或不存在buf区时,会先调用getg()函数获取正在发送者的goroutine,执行acquireSudog()函数获取sudoG对象,设置此次阻塞发送的相关信息(如发送的channel、是否在select控制结构中和待发送数据的内存地址、发送数据的goroutine)

    然后将该sudoG对象加入sendq队列,调用goparkunlock()函数让当前发送者的goroutine进入等待状态,表示当前goroutine正在等待其他goroutine从channel中接收数据,等待调度器唤醒;

    此时len()返回值为0,数据的发送是阻塞在方法中的。

    调度器唤醒后,将一些属性值设置为零,并释放sudog对象,表示向channel发送数据结束;

channel发送数据时涉及两次goroutine的调度

  1. 当接收队列里存在sudoG可以直接发送数据时,执行goready()函数,将G从Gwaiting或GScanwaiting转为Grunnable,等待下次调度触发,交由M执行;

  2. 当没有等待接收数据的G,并且没有缓冲区,或者缓冲区已满时,执行gopark()函数挂起当前G,将G阻塞,此时状态为Gwaiting,让出CPU等待调度器调度;

接收数据

使用 str <- ch 或 str, ok <- ch (ok用于判断ch是否关闭,如果没有ok,可能会无法分辨str接收到的零值是发送者发的还是ch关闭)接收数据,会转化为调用chanrecv1和chanrecv2函数,但最终会调用chanrecv函数接收数据。chanrecv1和chanrecv2函数都是设置阻塞参数为true。

  1. 如果chan是nil,则把接收者的goroutine park(阻塞休眠),接收者被永久阻塞;

  2. 不上锁检查 buf 区大小:如果chan的buf区大小为0 或者 没有数据可接收,检查是否被关闭,被关闭则返回;如果没被关闭,则再次检查buf区大小是否为0 或者 没有数据可接收,如果是,则清除ep指针中的数据并返回selected为true,received为false;

    这里两次empty检查,因为第一次检查,chan可能还没关闭,但是第二次检查时关闭了,由于可能在两次检查之间有待接收的数据达到了,所以需要两次empty检查;

  3. 上锁检查buf区大小:上锁,如果chan已经被close,且buf区没有数据,清除ep指针中的数据,解锁,返回selected为true,received为false;

  4. 同步接收 - 如果无buf,消费发送等待队列中G的数据,如果buf满,先拿buf区的,发送的再加入

    当chan的sendq队列存在等待状态的goroutine时(能拿到就说明要不就是buf区为0,要不就是buf区已满)

    如果是无buf区的chan,直接使用recv()函数从阻塞的发送者中获取数据;

    如果是有buf区的chan,说明此时buf区已满,则先从buf区中获取可接收的数据(从buf区中copy到接收者的内存),然后从sendq队列的队首中读取待发送的数据,加入到buf区中(将发送者的数据copy到buf区,替换刚刚buf区copy出去的位置),更新可接收和可发送的下标chan.recvx和sendx的值;

    最后调用goready()函数将等待发送数据而阻塞gorouotine的状态从Gwaiting 或者 Gscanwaiting 改变成 Grunnable,把goroutine绑定到P的LRQ中,等待下一轮调度时立即释放这个等待发送数据的goroutine;

  5. 异步接收 - 其次是消费buf区中的数据

    当channel的sendq队列没有等待状态的goroutine,且buf区存在数据时,从channel的buf区中的recvx的索引位置接收数据,如果接收数据的内存地址不为空,会直接将缓冲区里的数据拷贝到内存中,清除buf区中的数据,递增recvx,递减qcount,完成数据接收;

    这个和chansend共用一把锁,所以不会有并发问题;

  6. 阻塞接收 - 最后才是保存在接收等待队列,阻塞(阻塞只发生在这里,此时G和M分离)

    当channel的sendq队列没有等待状态的goroutine,且buf区不存在数据时,执行acquireSudog()函数获取sudoG对象,设置此次阻塞发送的相关信息(如发送的channel、是否在select控制结构中和待发送数据的内存地址、发送数据的goroutine)

    然后将该sudoG对象加入待发送recvq队列,调用goparkunlock()函数让当前接收者的goroutine进入等待状态,表示当前goroutine正在等待其他goroutine从channel中发送数据,等待调度器唤醒;

    此时方法会阻塞在ch的接收中,len()返回值为0;

    goroutine被唤醒后,chan完成阻塞数据的接收,接收完成后进行基本的参数检查,解除chan的绑定,释放sudoG,表示接收数据完成;

channel 接收过程中包含 2 次有关 goroutine 调度过程

  1. 当发送队列中存在 sudoG 时,调用goready(),G 的状态从 Gwaiting 或者 Gscanwaiting 改变成 Grunnable,等待下次调度便立即运行;

  2. 当 buf 区为空,且没有发送者时,调用 gopark()挂起当前G,此时状态为Gwaiting,让出 cpu 的使用权并等待调度器的调度;

关闭

  1. 如果 chan 为 nil,close 会 panic;

  2. 上锁:

    如果 chan 已经 closed,再次 close 也会 panic;

    否则的话,如果 chan 不为 nil,chan 也没有 closed,设置chan的标记为closed;

  3. 优先释放所有的接收者

    将接收者等待队列中的sudoG对象加入到待清除队列glist中,这里会优先回收接收者,这样即使从close中的chan读取数据,也不会panic,最多读到默认值;

    这样第6步执行的时候,才会先执行接收者,接收后面发送者的数据(接收buff数组里的数据,因为sender里的会被panic掉),否则发送者发送的数据无法被先接收。

  4. 其次是释放所有发送者: 将发送者等待队列中的sudoG对象加入到待清除队列glist中,这里可能会发生panic,因为往一个close的chan中发送数据会panic;

  5. 解锁

  6. 进行最后的调度,遍历glist中的sudoG,调用goready()触发调度,将每个goroutine状态从 Gwaiting 转为 Grunnable状态,等待调度器调度;

应用场景

  • 实现生产者 - 消费组模型,数据传递,比如worker池的实现

  • 信号通知:利用 如果chan为空,那receiver接收数据的时候就会阻塞等待,直到chan被关闭或有新数据进来 的特点,将一个协程将信号(closing、closed、data ready等)传递给另一个或者另一组协程,比如 wait/notify的模式。

  • 协程池,把要操作的逻辑封装成task,通过chan传输实现协程复用

  • 任务编排:让一组协程按照一定的顺序并发或串行执行,比如实现waitGroup的功能

  • 控制并发量,可以配合WaitGroup进行控制goroutine的数量

  • 任务定时

  • 实现互斥锁的机制,比如,容量为 1 的chan,放入chan的元素代表锁,谁先取得这个元素,就代表谁先获取了锁

共享资源的并发访问使用传统并发原语;

复杂的任务编排和消息传递使用 Channel;

消息通知机制使用 Channel,除非只想 signal 一个 goroutine,才使用 Cond;

简单等待所有任务的完成用 WaitGroup,也有 Channel 的推崇者用 Channel,都可以;

需要和 Select 语句结合,使用 Channel;需要和超时配合时,使用 Channel 和 Context。

注意点:使用chan要注意panic和goroutine泄露,另外,只要一个 chan 还有未读的数据,即使把它 close 掉,你还是可以继续把这些未读的数据消费完,之后才是读取零值数据。

在使用chan和select配合时要注意会出现goroutine泄漏的情况:

select

结构

select在runtime中不存在结构体表示,但是case倒是有

基本

  • 非阻塞收发:当chan中存在可接收数据,直接处理那个chan,否则执行default语句

  • 随机执行:select遇到多个case就绪,会进行随机选择

编译器会对select中的case进行优化,总共有四种情况:

  1. 不包含任何case,即空select,此时会阻塞当前的goroutine

  2. 只包含一个case,此时select会被优化成 if ,当chan没有数据可接收时,就会把阻塞当前goroutine,直到有数据到来;如果chan是nil,就会永远阻塞当前goroutine

  1. 存在两个case,其中一个是default:

    • 发送,这种情况下,发送是不阻塞的:

    • 接收,这种情况下,chan有值就走case,否则走default

    • 通用的select条件:比如select里包含多个case,会编译成通过runtime的selectgo方法处理case,selectgo会返回 case的序号 还有 是否被接收的标识,然后被编译成多个if,用于判断选中哪个case。

selectgo的流程

  1. 获取case数组,随机打乱,确定打乱后的轮询顺序数组pollorder和加锁顺序数组lockorder,数组里存放的元素是chan

  2. 按加锁顺序数组,调用chan的锁,依次进行锁定

  3. 进入主循环,遍历 轮询顺序数组pollorder

第一阶段,查找是否已经存在准备就绪的chan(此时的chan可以执行收发操作)此时需要处理四种类型的case:

  1. 当case不包含chan时,直接跳过;

  2. 当case会从chan中接收数据时:

  • 如果当前chan的sendq队列上有等待的goroutine,就会跳到 recv标签,如果没有buf区,则从sendq队列上获取数据,否则,从chan的buf区读取数据后,将sendq队列中等待的goroutine中的数据放入到buf区中相同的位置;

  • 如果当前chan的buf区不为空,就跳到bufrecv标签,从chan的buf区中获取数据

  • 如果当前chan已经被关闭,就会跳到 rclose标签 做一些清除的收尾工作;

  1. 当case会从chan中发送数据时:

  • 如果当前chan已经被关闭,会直接跳到 sclose标签,触发panic;

  • 如果当前chan的recvq队列上有等待的goroutine,就跳到 send标签 向chan发送数据;

  • 如果当前chan的缓冲区存在空闲位置,就会将等待发送的数据存入缓冲区中,因为select相当于有接收者了,不会出现发送阻塞的情况;

  1. 当case是default时,表示前面的所有case都没有被执行,此时会解锁所有的chan并返回(意味着当前select结构的收发都是非阻塞的),直接执行default内容;

第一阶段只是查找所有case中是否有可以立即被处理的chan,无论是数据是在等待的goroutine上,还是buf区中,只要存在数据满足条件就会立即处理,然后返回;如果不能立刻找到活跃的chan,就会进入下一循环;

第二阶段,将当前goroutine加入到chan对应的收发队列上并等待其他goroutine的唤醒:

  • 将当前goroutine,包装成sudogo,遍历case,加入到case的chan的sendq队列或者recvq队列中(同时,这个sudog会关联当前case的chan,然后将这些sudog组成链表,挂在当前goroutine下,用于唤醒之后的查找)

  • 调用gopark函数挂起当前goroutine,等待被调度器唤醒;

第三阶段,当前goroutine被唤醒后,找到满足条件的chan并进行处理:

  • 等到select对应的chan准备好后,当前goroutine会被调度器唤醒,被唤醒后,获取当前goroutine的sudog,依次对比所有case里chan对应的sudog结构,找到被唤醒的case,并释放其他未被使用的sudog结构;

  • 由于当前的select结构已经被挑选了其中一个case执行,剩下的case中没有被用到的sudog会被直接忽略并释放掉,为了不影响chan的正常使用,还需要将这些废弃的sudog从chan中出队;

Timer

https://www.cyhone.com/articles/analysis-of-golang-timer/

时间轮

概念理解:一张图理解Kafka时间轮

手把手教你如何用golang实现一个timewheel时间轮

Go语言中时间轮的实现

总结:

通过DelayQueue(优先级队列实现,队列里的每个元素,都是某一个具体时间的list) + 环形数组(数组的每个元素是个list,索引代表时间格)

DelayQueue会根据环形数组中的每个元素进行排序;

添加任务时,判断任务执行时间,加入环形数组中,对应的环形数组的元素(list),加入DelayQueue中。,

Last updated