相关推荐recommended
channel源码解析
作者:mmseoamin日期:2024-02-06

csp模型介绍

csp模型是golang采用的共享内存模型,比对传统多线程共享内存采用lock、condition等方式来规定执行顺序的方式,golang里的csp更强调信道(channel),不关心信道发送、接收方是谁,双方通过信道收发信息。

**ps:**和csp模型对应的actor模型,强调通信中的角色(actor),即收发双方,不看重通道。并且角色对外不提供任何接口访问,只约定通过通信异步交换信息,发送方必须知道接收方是谁,例如一个人要给另个人传递消息通过给对方的邮箱写信或者发短信。

channel

golang的channel实现了多线程并发安全的发送和接收方法,可以轻松实现多协程的生产消费模型。

源码解析(go1.19.3:src/runtime/chan.go)

数据结构定义

// 位于源码src/runtime/chan.go
type hchan struct {
	qcount   uint           // total data in the queue
	dataqsiz uint           // size of the circular queue
	buf      unsafe.Pointer // points to an array of dataqsiz elements
	elemsize uint16
	closed   uint32
	elemtype *_type // element type
	sendx    uint   // send index
	recvx    uint   // receive index
	recvq    waitq  // list of recv waiters
	sendq    waitq  // list of send waiters
	// lock protects all fields in hchan, as well as several
	// fields in sudogs blocked on this channel.
	//
	// Do not change another G's status while holding this lock
	// (in particular, do not ready a G), as this can deadlock
	// with stack shrinking.
	lock mutex
}

channel实现为一个固定大小的环形缓冲区,sendx、recvx表示缓冲区的头和尾,所以channel的收发可以一直复用这块缓冲区。

  • qcount: 缓冲区当前元素个数
  • dataqsiz: 缓冲区的容量
  • buf: 缓冲区(初始分配的元素数组)
  • elemsize: 元素的内存格式长度
  • closed: channel是否已经关闭
  • eletype: 元素的类型
  • sendx: 环形缓冲区的已写入元素最大索引
  • recvx: 环形缓冲区的已写入元素最小索引
  • recvq: 阻塞接收数据的协程队列(队列不为空表示channel无数据)
  • sendq: 阻塞发送数据的协程队列(队列不为空表示channel满)
  • lock: 一把互斥大锁

    初始化channel

    func makechan(t *chantype, size int) *hchan {
    	elem := t.elem
    	// compiler checks this but be safe.
    	if elem.size >= 1<<16 {
    		throw("makechan: invalid channel element type")
    	}
    	if hchanSize%maxAlign != 0 || elem.align > maxAlign {
    		throw("makechan: bad alignment")
    	}
    	mem, overflow := math.MulUintptr(elem.size, uintptr(size))
    	if overflow || mem > maxAlloc-hchanSize || size < 0 {
    		panic(plainError("makechan: size out of range"))
    	}
    	// Hchan does not contain pointers interesting for GC when elements stored in buf do not contain pointers.
    	// buf points into the same allocation, elemtype is persistent.
    	// SudoG's are referenced from their owning thread so they can't be collected.
    	// TODO(dvyukov,rlh): Rethink when collector can move allocated objects.
    	var c *hchan
    	switch {
    	case mem == 0:
    		// Queue or element size is zero.
    		c = (*hchan)(mallocgc(hchanSize, nil, true))
    		// Race detector uses this location for synchronization.
    		c.buf = c.raceaddr()
    	case elem.ptrdata == 0:
    		// Elements do not contain pointers.
    		// Allocate hchan and buf in one call.
    		c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
    		c.buf = add(unsafe.Pointer(c), hchanSize)
    	default:
    		// Elements contain pointers.
    		c = new(hchan)
    		c.buf = mallocgc(mem, elem, true)
    	}
    	c.elemsize = uint16(elem.size)
    	c.elemtype = elem
    	c.dataqsiz = uint(size)
    	lockInit(&c.lock, lockRankHchan)
    	if debugChan {
    		print("makechan: chan=", c, "; elemsize=", elem.size, "; dataqsiz=", size, "\n")
    	}
    	return c
    }
    

    1.校验channel元素类型大小不能超过64k、元素内存对齐系数不超过8字节;

    2.校验缓冲区申请的长度是否合法;

    3.根据channel类型分配不同的内存:阻塞通道(长度为0)就创建包含空buf的通道;非阻塞且结构体数据的通道(长度大于0,元素为struct{xxx})就创建hchan+长度*元素大小的内存;非阻塞且指针数据的通道(长度大于0,元素为ptr)就创建hchan和另一个长度大小的内存作为buf;

    channel发送

    func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
       if c == nil {
       	if !block {
       		return false
       	}
       	gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)
       	throw("unreachable")
       }
       if debugChan {
       	print("chansend: chan=", c, "\n")
       }
       if raceenabled {
       	racereadpc(c.raceaddr(), callerpc, abi.FuncPCABIInternal(chansend))
       }
       if !block && c.closed == 0 && full(c) {
       	return false
       }
       var t0 int64
       if blockprofilerate > 0 {
       	t0 = cputicks()
       }
       lock(&c.lock)
       if c.closed != 0 {
       	unlock(&c.lock)
       	panic(plainError("send on closed channel"))
       }
       if sg := c.recvq.dequeue(); sg != nil {
       	// Found a waiting receiver. We pass the value we want to send
       	// directly to the receiver, bypassing the channel buffer (if any).
       	send(c, sg, ep, func() { unlock(&c.lock) }, 3)
       	return true
       }
       if c.qcount < c.dataqsiz {
       	// Space is available in the channel buffer. Enqueue the element to send.
       	qp := chanbuf(c, c.sendx)
       	if raceenabled {
       		racenotify(c, c.sendx, nil)
       	}
       	typedmemmove(c.elemtype, qp, ep)
       	c.sendx++
       	if c.sendx == c.dataqsiz {
       		c.sendx = 0
       	}
       	c.qcount++
       	unlock(&c.lock)
       	return true
       }
       if !block {
       	unlock(&c.lock)
       	return false
       }
       // Block on the channel. Some receiver will complete our operation for us.
       gp := getg()
       mysg := acquireSudog()
       mysg.releasetime = 0
       if t0 != 0 {
       	mysg.releasetime = -1
       }
       // No stack splits between assigning elem and enqueuing mysg
       // on gp.waiting where copystack can find it.
       mysg.elem = ep
       mysg.waitlink = nil
       mysg.g = gp
       mysg.isSelect = false
       mysg.c = c
       gp.waiting = mysg
       gp.param = nil
       c.sendq.enqueue(mysg)
       atomic.Store8(&gp.parkingOnChan, 1)
       gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2)
       KeepAlive(ep)
       // someone woke us up.
       if mysg != gp.waiting {
       	throw("G waiting list is corrupted")
       }
       gp.waiting = nil
       gp.activeStackChans = false
       closed := !mysg.success
       gp.param = nil
       if mysg.releasetime > 0 {
       	blockevent(mysg.releasetime-t0, 2)
       }
       mysg.c = nil
       releaseSudog(mysg)
       if closed {
       	if c.closed == 0 {
       		throw("chansend: spurious wakeup")
       	}
       	panic(plainError("send on closed channel"))
       }
       return true
    }
    

    1.如果是非阻塞通道且通道满了,忽略本次数据发送,直接返回false;

    2.加锁锁住通道;

    3.如果通道关闭,表示往关闭的通道发送数据,崩溃;

    4.接收协程队列非空,表示通道无数据,接收协程们都在摸鱼等待数据,出队一个协程发送数据(数据拷贝给目标协程,并将目标协程放入当前线程m的p本地队列中等待调度),解锁通道,返回true;

    5.通道有数据且没满,将数据移动到缓冲区末尾,解锁通道,返回true;

    6.如果通道非阻塞的,经过以上步骤到这里表示通道满了,忽略本次数据发送,解锁通道,返回false;

    7.经过以上步骤到这里表示通道满了且是阻塞通道,将当前协程的数据加入发送协程队列,挂起当前协程等待被接收协程唤醒;

    8.出让线程m,让m去执行其它逻辑;

    channel接收

    func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
    	// raceenabled: don't need to check ep, as it is always on the stack
    	// or is new memory allocated by reflect.
    	if debugChan {
    		print("chanrecv: chan=", c, "\n")
    	}
    	if c == nil {
    		if !block {
    			return
    		}
    		gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)
    		throw("unreachable")
    	}
    	// Fast path: check for failed non-blocking operation without acquiring the lock.
    	if !block && empty(c) {
    		if atomic.Load(&c.closed) == 0 {
    			return
    		}
    		if empty(c) {
    			// The channel is irreversibly closed and empty.
    			if raceenabled {
    				raceacquire(c.raceaddr())
    			}
    			if ep != nil {
    				typedmemclr(c.elemtype, ep)
    			}
    			return true, false
    		}
    	}
    	var t0 int64
    	if blockprofilerate > 0 {
    		t0 = cputicks()
    	}
    	lock(&c.lock)
    	if c.closed != 0 {
    		if c.qcount == 0 {
    			if raceenabled {
    				raceacquire(c.raceaddr())
    			}
    			unlock(&c.lock)
    			if ep != nil {
    				typedmemclr(c.elemtype, ep)
    			}
    			return true, false
    		}
    		// The channel has been closed, but the channel's buffer have data.
    	} else {
    		// Just found waiting sender with not closed.
    		if sg := c.sendq.dequeue(); sg != nil {
    			recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
    			return true, true
    		}
    	}
    	if c.qcount > 0 {
    		// Receive directly from queue
    		qp := chanbuf(c, c.recvx)
    		if raceenabled {
    			racenotify(c, c.recvx, nil)
    		}
    		if ep != nil {
    			typedmemmove(c.elemtype, ep, qp)
    		}
    		typedmemclr(c.elemtype, qp)
    		c.recvx++
    		if c.recvx == c.dataqsiz {
    			c.recvx = 0
    		}
    		c.qcount--
    		unlock(&c.lock)
    		return true, true
    	}
    	if !block {
    		unlock(&c.lock)
    		return false, false
    	}
    	// no sender available: block on this channel.
    	gp := getg()
    	mysg := acquireSudog()
    	mysg.releasetime = 0
    	if t0 != 0 {
    		mysg.releasetime = -1
    	}
    	// No stack splits between assigning elem and enqueuing mysg
    	// on gp.waiting where copystack can find it.
    	mysg.elem = ep
    	mysg.waitlink = nil
    	gp.waiting = mysg
    	mysg.g = gp
    	mysg.isSelect = false
    	mysg.c = c
    	gp.param = nil
    	c.recvq.enqueue(mysg)
    	atomic.Store8(&gp.parkingOnChan, 1)
    	gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceEvGoBlockRecv, 2)
    	// someone woke us up
    	if mysg != gp.waiting {
    		throw("G waiting list is corrupted")
    	}
    	gp.waiting = nil
    	gp.activeStackChans = false
    	if mysg.releasetime > 0 {
    		blockevent(mysg.releasetime-t0, 2)
    	}
    	success := mysg.success
    	gp.param = nil
    	mysg.c = nil
    	releaseSudog(mysg)
    	return true, success
    }
    

    1.如果通道是非阻塞的且通道没有数据,返回false;

    2.加锁锁住通道;

    3.通道关闭了,解锁通道,返回false;

    4.如果发送协程队列非空,表示阻塞通道上有协程卡在发送数据,执行当前协程接收数据(复制数据到当前协程,和发送时一样环形阻塞的发送协程),解锁通道,返回true;

    5.否则经过以上且通道有数据,直接移动数据到当前协程,解锁通道,返回true;

    6.挂起当前接收协程,等到发送协程唤醒;

    7.让出执行线程m去执行别的逻辑;

    channel关闭

    func closechan(c *hchan) {
    	if c == nil {
    		panic(plainError("close of nil channel"))
    	}
    	lock(&c.lock)
    	if c.closed != 0 {
    		unlock(&c.lock)
    		panic(plainError("close of closed channel"))
    	}
    	if raceenabled {
    		callerpc := getcallerpc()
    		racewritepc(c.raceaddr(), callerpc, abi.FuncPCABIInternal(closechan))
    		racerelease(c.raceaddr())
    	}
    	c.closed = 1
    	var glist gList
    	// release all readers
    	for {
    		sg := c.recvq.dequeue()
    		if sg == nil {
    			break
    		}
    		if sg.elem != nil {
    			typedmemclr(c.elemtype, sg.elem)
    			sg.elem = nil
    		}
    		if sg.releasetime != 0 {
    			sg.releasetime = cputicks()
    		}
    		gp := sg.g
    		gp.param = unsafe.Pointer(sg)
    		sg.success = false
    		if raceenabled {
    			raceacquireg(gp, c.raceaddr())
    		}
    		glist.push(gp)
    	}
    	// release all writers (they will panic)
    	for {
    		sg := c.sendq.dequeue()
    		if sg == nil {
    			break
    		}
    		sg.elem = nil
    		if sg.releasetime != 0 {
    			sg.releasetime = cputicks()
    		}
    		gp := sg.g
    		gp.param = unsafe.Pointer(sg)
    		sg.success = false
    		if raceenabled {
    			raceacquireg(gp, c.raceaddr())
    		}
    		glist.push(gp)
    	}
    	unlock(&c.lock)
    	// Ready all Gs now that we've dropped the channel lock.
    	for !glist.empty() {
    		gp := glist.pop()
    		gp.schedlink = 0
    		goready(gp, 3)
    	}
    }
    

    1.加锁锁住通道;

    2.如果接收协程队列非空,说明通道为空,这些协程在摸鱼,给他们每个发送一个nil数据,并将每个协程加入临时协程队列记录;

    3.如果发送协程队列非空,说明通道满,给他们每个发送一个nil数据,并将每个协程加入临时协程队列;

    4.解锁通道;

    5.判断临时协程队列非空,说明有需要唤醒的协程,遍历唤醒;

    总结

    channel的大体功能函数就分析完了,逻辑还是挺简单的,主要是利用环形缓冲区、lock的加速设计、协程休眠、唤醒机制。

    不过还需要注意select {case: ; default:}语句块包裹下的收发:如果是发送或者接收逻辑包裹了select+default,会标记为无阻塞通道,收发不成功立即返回,不会休眠当前协程。