使用的go版本为 go1.21.2
首先我们写一个简单的chan调度代码
package main import "fmt" func main() { ch := make(chan struct{}) go func() { ch <- struct{}{} ch <- struct{}{} }() fmt.Println("xiaochuan", <-ch) data, ok := <-ch fmt.Println("xiaochuan", data, ok) close(ch) }
因为ch的数据获取方式有两种,所以这个示例代码写了两次的ch读与写
老样子通过go build -gcflags -S main.go获取到对应的汇编代码
调度make最终被转换为CALL runtime.makechan
调度ch <- struct{}{}最终被转换为CALL runtime.chansend1 由于我们调度了两次所以这里有两个
调度 <-ch 最终被转换为CALL runtime.chanrecv1
我们还进行一次两个参数的调度接收ch读取
data, ok := <-ch最终被转换为CALL runtime.chanrecv2
调度 close(ch) 最终被转换为CALL runtime.closechan 先来看一下hchan构造体相关的底层源码
//代码位于 GOROOT/src/runtime/chan.go L:33 type hchan struct { qcount uint // 环形队列中元素个数 dataqsiz uint // 环形队列的大小 buf unsafe.Pointer // 指向大小为 dataqsiz 的数组 elemsize uint16 // 元素大小 closed uint32 // 是否关闭 elemtype *_type // 元素类型 sendx uint // 发送索引 recvx uint // 接收索引 recvq waitq // recv 等待列表,即( <-ch ) sendq waitq // send 等待列表,即( ch<- ) lock mutex // 锁 } type waitq struct { // 等待队列 sudog 双向队列 first *sudog last *sudog } type sudog struct { // 下面的字段由 sudog 阻塞的 channel 的 hchan.lock 保护。 // shrinkstack 依赖这个字段来处理参与 channel 操作的 sudog。 g *g next *sudog prev *sudog elem unsafe.Pointer // 数据元素(可能指向堆栈) // 下面的字段在任何情况下都不会并发访问。 // 对于 channels,waitlink 只有 g 访问。 // 对于 semaphores,所有字段(包括上面的字段) // 仅在持有 semaRoot 锁时才会访问。 acquiretime int64 releasetime int64 ticket uint32 // isSelect 表示 g 参与了 select,因此 g.selectDone 必须进行 CAS 操作以赢得唤醒竞争。 isSelect bool // success 表示通信是否成功。如果 goroutine 被唤醒是因为在通道 c 上传递了值,则为 true, // 如果是因为 c 被关闭而唤醒,则为 false。 success bool parent *sudog // semaRoot 二叉树 waitlink *sudog // g.waiting 列表或 semaRoot waittail *sudog // semaRoot c *hchan // channel }
先从创建chan开始
//代码位于 GOROOT/src/runtime/chan.go L:65 //如果我们make的初始化缓冲区比较大会调度这个函数 func makechan64(t *chantype, size int64) *hchan { //将size强转为int类型 //因为go的int类型的大小在不同平台上可能是 32 位或 64 位 //如果大小超过了当前平台int最大值,会截断掉超出最大值的部分 if int64(int(size)) != size { panic(plainError("makechan: size out of range")) } //强制转换为int类型超出int部分截断 return makechan(t, int(size)) } func makechan(t *chantype, size int) *hchan { elem := t.Elem //编辑器检测元素的大小会不会大于2的16次方,对齐方式 if elem.Size_ >= 1<<16 { throw("makechan: invalid channel element type") } if hchanSize%maxAlign != 0 || elem.Align_ > maxAlign { throw("makechan: bad alignment") } //检测内存大小,会不会有溢出的情况 mem, overflow := math.MulUintptr(elem.Size_, uintptr(size)) if overflow || mem > maxAlloc-hchanSize || size < 0 { panic(plainError("makechan: size out of range")) } //初始化hchan var c *hchan switch { case mem == 0: //队列或元素大小为零 // Queue or element size is zero. c = (*hchan)(mallocgc(hchanSize, nil, true)) // Race detector uses this location for synchronization. c.buf = c.raceaddr() case elem.PtrBytes == 0: //元素不包含指针(在调用中分配 hchan 和 buf) // Elements do not contain pointers. // Allocate hchan and buf in one call. c = (*hchan)(mallocgc(hchanSize+mem, nil, true)) c.buf = add(unsafe.Pointer(c), hchanSize) default: //元素包含指针 // Elements contain pointers. c = new(hchan) c.buf = mallocgc(mem, elem, true) } //填充元素大小、元素类型、数据环形队列的大小 c.elemsize = uint16(elem.Size_) c.elemtype = elem c.dataqsiz = uint(size) lockInit(&c.lock, lockRankHchan) if debugChan { //开启debug开关,公屏打印 print("makechan: chan=", c, "; elemsize=", elem.Size_, "; dataqsiz=", size, "\n") } return c }
//代码位于 GOROOT/src/runtime/chan.go L:142 //c <- x 调度这个函数 func chansend1(c *hchan, elem unsafe.Pointer) { chansend(c, elem, true, getcallerpc()) } func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool { if c == nil { //判断当前ch是不是一个空指针,如果为空将当前G休眠,触发崩溃 if !block { return false } gopark(nil, nil, waitReasonChanSendNilChan, traceBlockForever, 2) throw("unreachable") } if debugChan { //开启debug开关,公屏打印 print("chansend: chan=", c, "\n") } if raceenabled {//竞争开启 racereadpc(c.raceaddr(), callerpc, abi.FuncPCABIInternal(chansend)) } //在无锁的情况下,检测一下是否ch 是否关闭,是否会造成阻塞 if !block && c.closed == 0 && full(c) { return false } var t0 int64 if blockprofilerate > 0 { t0 = cputicks() } lock(&c.lock) //获取chan锁 if c.closed != 0 { // 二次确认chan是不是已经关闭 unlock(&c.lock) panic(plainError("send on closed channel")) } //判断当前ch是否存在接收方 //如果存在直接调用send函数将数据发送给对方,避免数据复制到缓存区中去 if sg := c.recvq.dequeue(); sg != nil { send(c, sg, ep, func() { unlock(&c.lock) }, 3) return true } //判断当前ch元素个数是否小于队列的长度 //如果有剩余空间将数据将要发送的元素加入队列 if c.qcount < c.dataqsiz { // 获取环形队列中的元素 qp := chanbuf(c, c.sendx) if raceenabled { racenotify(c, c.sendx, nil) } // 直接ep复制给qp typedmemmove(c.elemtype, qp, ep) c.sendx++ if c.sendx == c.dataqsiz { c.sendx = 0 } c.qcount++ unlock(&c.lock) return true } if !block { unlock(&c.lock) return false } gp := getg() //获取当前G //获取一个sudog, 优先从P中获取 //如果P中的sudog缓存区(本地无锁)为空 //从调度器层的sudog缓冲区(全局需要加锁)中拿数据放入P的sudog缓存区 mysg := acquireSudog() mysg.releasetime = 0 if t0 != 0 { mysg.releasetime = -1 } //将sudog写入send环形队列中去 mysg.elem = ep mysg.waitlink = nil mysg.g = gp mysg.isSelect = false mysg.c = c gp.waiting = mysg gp.param = nil c.sendq.enqueue(mysg) //将当前G的parkingOnChan设置为true(表示目前停止在了chansend或chanrecv上) //将当前的G移出调度队列(调度chanparkcommit解锁当前ch) gp.parkingOnChan.Store(true) gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceBlockChanSend, 2) //调度KeepAlive函数确保发送的元素处于一个可达的状态避免被回收 KeepAlive(ep) //当前后续唤醒G //判断G的等待列表是否为当前的sudog //如果不一致说明G已经被改写了 if mysg != gp.waiting { throw("G waiting list is corrupted") } //清空G的等待队列, //获取当前被唤醒的原因sudog.succes //因为唤醒方式有两种,1。通道关闭 2.接收唤起 gp.waiting = nil gp.activeStackChans = false closed := !mysg.success gp.param = nil //清空G的参数列表 if mysg.releasetime > 0 { blockevent(mysg.releasetime-t0, 2) } mysg.c = nil releaseSudog(mysg) //释放sudog重新放回P的sudogcache(本地) if closed { //由于不能写入关闭的chan,所以直接异常了 if c.closed == 0 { throw("chansend: spurious wakeup") } panic(plainError("send on closed channel")) } return true }
直接发送的时候调用的send函数解读如下
//代码位于 GOROOT/src/runtime/chan.go L:295 func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) { if raceenabled { if c.dataqsiz == 0 { racesync(c, sg) } else { // Pretend we go through the buffer, even though // we copy directly. Note that we need to increment // the head/tail locations only when raceenabled. racenotify(c, c.recvx, nil) racenotify(c, c.recvx, sg) c.recvx++ if c.recvx == c.dataqsiz { c.recvx = 0 } c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz } } // 检测数据是否为空 // 如果不为空直接调用sendDirect函数发送数据,然后将其重置为nil if sg.elem != nil { sendDirect(c.elemtype, sg, ep) sg.elem = nil } //获取等待列表中的G, //将当前的ch解锁, sugo赋值为G当做启动参数 gp := sg.g unlockf() gp.param = unsafe.Pointer(sg) sg.success = true //sugo判断释放时间是否为0 //为0将其设置为当前 CPU 的时钟滴答数 if sg.releasetime != 0 { sg.releasetime = cputicks() } //将G标记为可运行状态,放入调度队列等待被后续调度 goready(gp, skip+1) }
//代码位于 GOROOT/src/runtime/chan.go L:442 //chanrecv1与chanrecv2的处理逻辑基本差不多 //chanrecv2多接受了一个变量而已 //可以理解为这样ok := chanrecv2(ch, v) func chanrecv1(c *hchan, elem unsafe.Pointer) { chanrecv(c, elem, true) } func chanrecv2(c *hchan, elem unsafe.Pointer) (received bool) { _, received = chanrecv(c, elem, true) return } func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) { if debugChan {//开启debug开关,公屏打印 print("chanrecv: chan=", c, "\n") } if c == nil {//判断当前ch是不是为空指针,如果为空将当前G休眠,触发崩溃 if !block { return } gopark(nil, nil, waitReasonChanReceiveNilChan, traceBlockForever, 2) throw("unreachable") } if !block && empty(c) {//非阻塞情况下, 且数据队列为空 if atomic.Load(&c.closed) == 0 { //原子读取 当前ch是否关闭,如果关闭直接返回 // Because a channel cannot be reopened, the later observation of the channel // being not closed implies that it was also not closed at the moment of the // first observation. We behave as if we observed the channel at that moment // and report that the receive cannot proceed. return } if empty(c) {// 重新检测是否为空ch // The channel is irreversibly closed and empty. if raceenabled { raceacquire(c.raceaddr()) } if ep != nil { typedmemclr(c.elemtype, ep) } return true, false } } var t0 int64 if blockprofilerate > 0 { t0 = cputicks() } lock(&c.lock) //获取chan锁 if c.closed != 0 { // 二次确认ch是不是已经关闭 if c.qcount == 0 { if raceenabled { raceacquire(c.raceaddr()) } unlock(&c.lock) if ep != nil { typedmemclr(c.elemtype, ep) } return true, false } } else { // 判断当前ch是否存在发送方 // 如果存在直接调用recv函数将数据接受对方的数据 if sg := c.sendq.dequeue(); sg != nil { // Found a waiting sender. If buffer is size 0, receive value // directly from sender. Otherwise, receive from head of queue // and add sender's value to the tail of the queue (both map to // the same buffer slot because the queue is full). recv(c, sg, ep, func() { unlock(&c.lock) }, 3) return true, true } } //环形队列中存在数据,直接从队列中接收,传递给接受者 if c.qcount > 0 { // 获取环形队列中的元素 qp := chanbuf(c, c.recvx) if raceenabled { racenotify(c, c.recvx, nil) } if ep != nil { // 直接qp复制给ep typedmemmove(c.elemtype, ep, qp) } //清除数据 typedmemclr(c.elemtype, qp) c.recvx++ if c.recvx == c.dataqsiz { c.recvx = 0 } c.qcount-- unlock(&c.lock) return true, true } if !block { unlock(&c.lock) return false, false } gp := getg()//获取当前G //获取一个sudog, 优先从P中获取 //如果P中的sudog缓存区(本地无锁)为空 //从调度器层的sudog缓冲区(全局需要加锁)中拿数据放入P的sudog缓存区 mysg := acquireSudog() mysg.releasetime = 0 if t0 != 0 { mysg.releasetime = -1 } //将sudog写入recvq环形队列中去 mysg.elem = ep mysg.waitlink = nil gp.waiting = mysg mysg.g = gp mysg.isSelect = false mysg.c = c gp.param = nil c.recvq.enqueue(mysg) //将当前G的parkingOnChan设置为true(表示目前停止在了chansend或chanrecv上) //将当前的G移出调度队列(调度chanparkcommit解锁当前ch) gp.parkingOnChan.Store(true) gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceBlockChanRecv, 2) //当前后续唤醒G //判断G的等待列表是否为当前的sudog //如果不一致说明G已经被改写了 if mysg != gp.waiting { throw("G waiting list is corrupted") } //清空G的等待队列, //获取当前被唤醒的原因sudog.succes //因为唤醒方式有两种,1。通道关闭 2.发送唤起 gp.waiting = nil gp.activeStackChans = false if mysg.releasetime > 0 { blockevent(mysg.releasetime-t0, 2) } success := mysg.success gp.param = nil mysg.c = nil releaseSudog(mysg)//释放sudog重新放回P的sudogcache(本地) return true, success }
直接读取的时候调用的recv函数解读如下
//代码位于 GOROOT/src/runtime/chan.go L:616 func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) { //判断当前环形队列是否为0 //为0从发送方复制数据(调度recvDirect函数) if c.dataqsiz == 0 { if raceenabled { racesync(c, sg) } if ep != nil { // copy data from sender recvDirect(c.elemtype, sg, ep) } } else { // 获取环形队列中的元素 qp := chanbuf(c, c.recvx) if raceenabled { racenotify(c, c.recvx, nil) racenotify(c, c.recvx, sg) } // 如果数据不为空 直接ep复制给qp if ep != nil { typedmemmove(c.elemtype, ep, qp) } // 清除数据 typedmemmove(c.elemtype, qp, sg.elem) c.recvx++ if c.recvx == c.dataqsiz { c.recvx = 0 } c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz } //获取等待列表中的G, //将当前的ch解锁, sugo赋值为G当做启动参数 sg.elem = nil gp := sg.g unlockf() gp.param = unsafe.Pointer(sg) sg.success = true //sugo判断释放时间是否为0 //为0将其设置为当前 CPU 的时钟滴答数 if sg.releasetime != 0 { sg.releasetime = cputicks() } //将G标记为可运行状态,放入调度队列等待被后续调度 goready(gp, skip+1) }
//代码位于 GOROOT/src/runtime/chan.go L:358 func closechan(c *hchan) { if c == nil {//如果ch未初始化直接报错 panic(plainError("close of nil channel")) } lock(&c.lock) //获取chan锁 if c.closed != 0 { //如果当前ch已经处于关闭状态,触发异常 unlock(&c.lock) panic(plainError("close of closed channel")) } if raceenabled { //竞争开启 callerpc := getcallerpc() racewritepc(c.raceaddr(), callerpc, abi.FuncPCABIInternal(closechan)) racerelease(c.raceaddr()) } c.closed = 1 //将当前ch设置为关闭状态 //待唤醒的G列表 var glist gList // release all readers for { //逐步从读取队列取值,直到获取完为止 sg := c.recvq.dequeue() if sg == nil { break } //数据不为空,释放掉对应的内存块 if sg.elem != nil { typedmemclr(c.elemtype, sg.elem) sg.elem = nil } // 重置释放时间 if sg.releasetime != 0 { sg.releasetime = cputicks() } // 获取对应的G, 重置唤醒参数 // 将这个G加入到glist中等待后续唤醒 gp := sg.g gp.param = unsafe.Pointer(sg) sg.success = false if raceenabled { raceacquireg(gp, c.raceaddr()) } glist.push(gp) } for { //逐步从发送队列取值,直到获取完为止 (向关闭的ch发送数据会有panic) sg := c.sendq.dequeue() if sg == nil { break } sg.elem = nil // 重置释放时间 if sg.releasetime != 0 { sg.releasetime = cputicks() } // 获取对应的G, 重置唤醒参数 // 将这个G加入到glist中等待后续唤醒 gp := sg.g gp.param = unsafe.Pointer(sg) sg.success = false if raceenabled { raceacquireg(gp, c.raceaddr()) } glist.push(gp) } unlock(&c.lock) // 循环glist待唤醒列表将G设置为read状态(唤醒G运行干活) for !glist.empty() { gp := glist.pop() gp.schedlink = 0 goready(gp, 3) } }
我们从上面的源码分析了解chan的数据结构、发送数据、接收数据和关闭这些基本操作,从源码分析我们得知chan的读写操作是会上锁的,如果业务中对性能要求比较高的情况下chan的这把锁会成为我们系统内的瓶颈。