1 // Copyright 2014 The Go Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style
3 // license that can be found in the LICENSE file.
7 // This file contains the implementation of Go channels.
10 // At least one of c.sendq and c.recvq is empty,
11 // except for the case of an unbuffered channel with a single goroutine
12 // blocked on it for both sending and receiving using a select statement,
13 // in which case the length of c.sendq and c.recvq is limited only by the
14 // size of the select statement.
16 // For buffered channels, also:
17 // c.qcount > 0 implies that c.recvq is empty.
18 // c.qcount < c.dataqsiz implies that c.sendq is empty.
22 "runtime/internal/atomic"
23 "runtime/internal/math"
27 // For gccgo, use go:linkname to export compiler-called functions.
29 //go:linkname makechan
30 //go:linkname makechan64
31 //go:linkname chansend1
32 //go:linkname chanrecv1
33 //go:linkname chanrecv2
34 //go:linkname closechan
35 //go:linkname selectnbsend
36 //go:linkname selectnbrecv
40 hchanSize
= unsafe
.Sizeof(hchan
{}) + uintptr(-int(unsafe
.Sizeof(hchan
{}))&(maxAlign
-1))
45 qcount
uint // total data in the queue
46 dataqsiz
uint // size of the circular queue
47 buf unsafe
.Pointer
// points to an array of dataqsiz elements
50 elemtype
*_type
// element type
51 sendx
uint // send index
52 recvx
uint // receive index
53 recvq waitq
// list of recv waiters
54 sendq waitq
// list of send waiters
56 // lock protects all fields in hchan, as well as several
57 // fields in sudogs blocked on this channel.
59 // Do not change another G's status while holding this lock
60 // (in particular, do not ready a G), as this can deadlock
61 // with stack shrinking.
70 //go:linkname reflect_makechan reflect.makechan
71 func reflect_makechan(t
*chantype
, size
int) *hchan
{
72 return makechan(t
, size
)
75 func makechan64(t
*chantype
, size
int64) *hchan
{
76 if int64(int(size
)) != size
{
77 panic(plainError("makechan: size out of range"))
80 return makechan(t
, int(size
))
83 func makechan(t
*chantype
, size
int) *hchan
{
86 // compiler checks this but be safe.
87 if elem
.size
>= 1<<16 {
88 throw("makechan: invalid channel element type")
90 if hchanSize%maxAlign
!= 0 || elem
.align
> maxAlign
{
91 throw("makechan: bad alignment")
94 mem
, overflow
:= math
.MulUintptr(elem
.size
, uintptr(size
))
95 if overflow || mem
> maxAlloc
-hchanSize || size
< 0 {
96 panic(plainError("makechan: size out of range"))
99 // Hchan does not contain pointers interesting for GC when elements stored in buf do not contain pointers.
100 // buf points into the same allocation, elemtype is persistent.
101 // SudoG's are referenced from their owning thread so they can't be collected.
102 // TODO(dvyukov,rlh): Rethink when collector can move allocated objects.
106 // Queue or element size is zero.
107 c
= (*hchan
)(mallocgc(hchanSize
, nil, true))
108 // Race detector uses this location for synchronization.
110 case elem
.ptrdata
== 0:
111 // Elements do not contain pointers.
112 // Allocate hchan and buf in one call.
113 c
= (*hchan
)(mallocgc(hchanSize
+mem
, nil, true))
114 c
.buf
= add(unsafe
.Pointer(c
), hchanSize
)
116 // Elements contain pointers.
118 c
.buf
= mallocgc(mem
, elem
, true)
121 c
.elemsize
= uint16(elem
.size
)
123 c
.dataqsiz
= uint(size
)
124 lockInit(&c
.lock
, lockRankHchan
)
127 print("makechan: chan=", c
, "; elemsize=", elem
.size
, "; dataqsiz=", size
, "\n")
132 // chanbuf(c, i) is pointer to the i'th slot in the buffer.
133 func chanbuf(c
*hchan
, i
uint) unsafe
.Pointer
{
134 return add(c
.buf
, uintptr(i
)*uintptr(c
.elemsize
))
137 // full reports whether a send on c would block (that is, the channel is full).
138 // It uses a single word-sized read of mutable state, so although
139 // the answer is instantaneously true, the correct answer may have changed
140 // by the time the calling function receives the return value.
141 func full(c
*hchan
) bool {
142 // c.dataqsiz is immutable (never written after the channel is created)
143 // so it is safe to read at any time during channel operation.
145 // Assumes that a pointer read is relaxed-atomic.
146 return c
.recvq
.first
== nil
148 // Assumes that a uint read is relaxed-atomic.
149 return c
.qcount
== c
.dataqsiz
152 // entry point for c <- x from compiled code
154 func chansend1(c
*hchan
, elem unsafe
.Pointer
) {
155 chansend(c
, elem
, true, getcallerpc())
159 * generic single channel send/recv
160 * If block is not nil,
161 * then the protocol will not
162 * sleep but return if it could
165 * sleep can wake up with g.param == nil
166 * when a channel involved in the sleep has
167 * been closed. it is easiest to loop and re-run
168 * the operation; we'll see that it's now closed.
170 func chansend(c
*hchan
, ep unsafe
.Pointer
, block
bool, callerpc
uintptr) bool {
171 // Check preemption, since unlike gc we don't check on every call.
180 gopark(nil, nil, waitReasonChanSendNilChan
, traceEvGoStop
, 2)
185 print("chansend: chan=", c
, "\n")
189 racereadpc(c
.raceaddr(), callerpc
, abi
.FuncPCABIInternal(chansend
))
192 // Fast path: check for failed non-blocking operation without acquiring the lock.
194 // After observing that the channel is not closed, we observe that the channel is
195 // not ready for sending. Each of these observations is a single word-sized read
196 // (first c.closed and second full()).
197 // Because a closed channel cannot transition from 'ready for sending' to
198 // 'not ready for sending', even if the channel is closed between the two observations,
199 // they imply a moment between the two when the channel was both not yet closed
200 // and not ready for sending. We behave as if we observed the channel at that moment,
201 // and report that the send cannot proceed.
203 // It is okay if the reads are reordered here: if we observe that the channel is not
204 // ready for sending and then observe that it is not closed, that implies that the
205 // channel wasn't closed during the first observation. However, nothing here
206 // guarantees forward progress. We rely on the side effects of lock release in
207 // chanrecv() and closechan() to update this thread's view of c.closed and full().
208 if !block
&& c
.closed == 0 && full(c
) {
213 if blockprofilerate
> 0 {
221 panic(plainError("send on closed channel"))
224 if sg
:= c
.recvq
.dequeue(); sg
!= nil {
225 // Found a waiting receiver. We pass the value we want to send
226 // directly to the receiver, bypassing the channel buffer (if any).
227 send(c
, sg
, ep
, func() { unlock(&c
.lock
) }, 3)
231 if c
.qcount
< c
.dataqsiz
{
232 // Space is available in the channel buffer. Enqueue the element to send.
233 qp
:= chanbuf(c
, c
.sendx
)
235 racenotify(c
, c
.sendx
, nil)
237 typedmemmove(c
.elemtype
, qp
, ep
)
239 if c
.sendx
== c
.dataqsiz
{
252 // Block on the channel. Some receiver will complete our operation for us.
254 mysg
:= acquireSudog()
257 mysg
.releasetime
= -1
259 // No stack splits between assigning elem and enqueuing mysg
260 // on gp.waiting where copystack can find it.
264 mysg
.isSelect
= false
268 c
.sendq
.enqueue(mysg
)
269 // Signal to anyone trying to shrink our stack that we're about
270 // to park on a channel. The window between when this G's status
271 // changes and when we set gp.activeStackChans is not safe for
273 atomic
.Store8(&gp
.parkingOnChan
, 1)
274 gopark(chanparkcommit
, unsafe
.Pointer(&c
.lock
), waitReasonChanSend
, traceEvGoBlockSend
, 2)
275 // Ensure the value being sent is kept alive until the
276 // receiver copies it out. The sudog has a pointer to the
277 // stack object, but sudogs aren't considered as roots of the
281 // someone woke us up.
282 if mysg
!= gp
.waiting
{
283 throw("G waiting list is corrupted")
286 gp
.activeStackChans
= false
287 closed := !mysg
.success
289 if mysg
.releasetime
> 0 {
290 blockevent(mysg
.releasetime
-t0
, 2)
296 throw("chansend: spurious wakeup")
298 panic(plainError("send on closed channel"))
303 // send processes a send operation on an empty channel c.
304 // The value ep sent by the sender is copied to the receiver sg.
305 // The receiver is then woken up to go on its merry way.
306 // Channel c must be empty and locked. send unlocks c with unlockf.
307 // sg must already be dequeued from c.
308 // ep must be non-nil and point to the heap or the caller's stack.
309 func send(c
*hchan
, sg
*sudog
, ep unsafe
.Pointer
, unlockf
func(), skip
int) {
314 // Pretend we go through the buffer, even though
315 // we copy directly. Note that we need to increment
316 // the head/tail locations only when raceenabled.
317 racenotify(c
, c
.recvx
, nil)
318 racenotify(c
, c
.recvx
, sg
)
320 if c
.recvx
== c
.dataqsiz
{
323 c
.sendx
= c
.recvx
// c.sendx = (c.sendx+1) % c.dataqsiz
327 sendDirect(c
.elemtype
, sg
, ep
)
332 gp
.param
= unsafe
.Pointer(sg
)
334 if sg
.releasetime
!= 0 {
335 sg
.releasetime
= cputicks()
340 // Sends and receives on unbuffered or empty-buffered channels are the
341 // only operations where one running goroutine writes to the stack of
342 // another running goroutine. The GC assumes that stack writes only
343 // happen when the goroutine is running and are only done by that
344 // goroutine. Using a write barrier is sufficient to make up for
345 // violating that assumption, but the write barrier has to work.
346 // typedmemmove will call bulkBarrierPreWrite, but the target bytes
347 // are not in the heap, so that will not help. We arrange to call
348 // memmove and typeBitsBulkBarrier instead.
350 func sendDirect(t
*_type
, sg
*sudog
, src unsafe
.Pointer
) {
351 // src is on our stack, dst is a slot on another stack.
353 // Once we read sg.elem out of sg, it will no longer
354 // be updated if the destination's stack gets copied (shrunk).
355 // So make sure that no preemption points can happen between read & use.
357 typeBitsBulkBarrier(t
, uintptr(dst
), uintptr(src
), t
.size
)
358 // No need for cgo write barrier checks because dst is always
360 memmove(dst
, src
, t
.size
)
363 func recvDirect(t
*_type
, sg
*sudog
, dst unsafe
.Pointer
) {
364 // dst is on our stack or the heap, src is on another stack.
365 // The channel is locked, so src will not move during this
368 typeBitsBulkBarrier(t
, uintptr(dst
), uintptr(src
), t
.size
)
369 memmove(dst
, src
, t
.size
)
372 func closechan(c
*hchan
) {
374 panic(plainError("close of nil channel"))
380 panic(plainError("close of closed channel"))
384 callerpc
:= getcallerpc()
385 racewritepc(c
.raceaddr(), callerpc
, abi
.FuncPCABIInternal(closechan
))
386 racerelease(c
.raceaddr())
393 // release all readers
395 sg
:= c
.recvq
.dequeue()
400 typedmemclr(c
.elemtype
, sg
.elem
)
403 if sg
.releasetime
!= 0 {
404 sg
.releasetime
= cputicks()
407 gp
.param
= unsafe
.Pointer(sg
)
410 raceacquireg(gp
, c
.raceaddr())
415 // release all writers (they will panic)
417 sg
:= c
.sendq
.dequeue()
422 if sg
.releasetime
!= 0 {
423 sg
.releasetime
= cputicks()
426 gp
.param
= unsafe
.Pointer(sg
)
429 raceacquireg(gp
, c
.raceaddr())
435 // Ready all Gs now that we've dropped the channel lock.
443 // empty reports whether a read from c would block (that is, the channel is
444 // empty). It uses a single atomic read of mutable state.
445 func empty(c
*hchan
) bool {
446 // c.dataqsiz is immutable.
448 return atomic
.Loadp(unsafe
.Pointer(&c
.sendq
.first
)) == nil
450 return atomic
.Loaduint(&c
.qcount
) == 0
453 // entry points for <- c from compiled code
455 func chanrecv1(c
*hchan
, elem unsafe
.Pointer
) {
456 chanrecv(c
, elem
, true)
460 func chanrecv2(c
*hchan
, elem unsafe
.Pointer
) (received
bool) {
461 _
, received
= chanrecv(c
, elem
, true)
465 // chanrecv receives on channel c and writes the received data to ep.
466 // ep may be nil, in which case received data is ignored.
467 // If block == false and no elements are available, returns (false, false).
468 // Otherwise, if c is closed, zeros *ep and returns (true, false).
469 // Otherwise, fills in *ep with an element and returns (true, true).
470 // A non-nil ep must point to the heap or the caller's stack.
471 func chanrecv(c
*hchan
, ep unsafe
.Pointer
, block
bool) (selected
, received
bool) {
472 // raceenabled: don't need to check ep, as it is always on the stack
473 // or is new memory allocated by reflect.
476 print("chanrecv: chan=", c
, "\n")
479 // Check preemption, since unlike gc we don't check on every call.
488 gopark(nil, nil, waitReasonChanReceiveNilChan
, traceEvGoStop
, 2)
492 // Fast path: check for failed non-blocking operation without acquiring the lock.
493 if !block
&& empty(c
) {
494 // After observing that the channel is not ready for receiving, we observe whether the
495 // channel is closed.
497 // Reordering of these checks could lead to incorrect behavior when racing with a close.
498 // For example, if the channel was open and not empty, was closed, and then drained,
499 // reordered reads could incorrectly indicate "open and empty". To prevent reordering,
500 // we use atomic loads for both checks, and rely on emptying and closing to happen in
501 // separate critical sections under the same lock. This assumption fails when closing
502 // an unbuffered channel with a blocked send, but that is an error condition anyway.
503 if atomic
.Load(&c
.closed) == 0 {
504 // Because a channel cannot be reopened, the later observation of the channel
505 // being not closed implies that it was also not closed at the moment of the
506 // first observation. We behave as if we observed the channel at that moment
507 // and report that the receive cannot proceed.
510 // The channel is irreversibly closed. Re-check whether the channel has any pending data
511 // to receive, which could have arrived between the empty and closed checks above.
512 // Sequential consistency is also required here, when racing with such a send.
514 // The channel is irreversibly closed and empty.
516 raceacquire(c
.raceaddr())
519 typedmemclr(c
.elemtype
, ep
)
526 if blockprofilerate
> 0 {
532 if c
.closed != 0 && c
.qcount
== 0 {
534 raceacquire(c
.raceaddr())
538 typedmemclr(c
.elemtype
, ep
)
543 if sg
:= c
.sendq
.dequeue(); sg
!= nil {
544 // Found a waiting sender. If buffer is size 0, receive value
545 // directly from sender. Otherwise, receive from head of queue
546 // and add sender's value to the tail of the queue (both map to
547 // the same buffer slot because the queue is full).
548 recv(c
, sg
, ep
, func() { unlock(&c
.lock
) }, 3)
553 // Receive directly from queue
554 qp
:= chanbuf(c
, c
.recvx
)
556 racenotify(c
, c
.recvx
, nil)
559 typedmemmove(c
.elemtype
, ep
, qp
)
561 typedmemclr(c
.elemtype
, qp
)
563 if c
.recvx
== c
.dataqsiz
{
576 // no sender available: block on this channel.
578 mysg
:= acquireSudog()
581 mysg
.releasetime
= -1
583 // No stack splits between assigning elem and enqueuing mysg
584 // on gp.waiting where copystack can find it.
589 mysg
.isSelect
= false
592 c
.recvq
.enqueue(mysg
)
593 // Signal to anyone trying to shrink our stack that we're about
594 // to park on a channel. The window between when this G's status
595 // changes and when we set gp.activeStackChans is not safe for
597 atomic
.Store8(&gp
.parkingOnChan
, 1)
598 gopark(chanparkcommit
, unsafe
.Pointer(&c
.lock
), waitReasonChanReceive
, traceEvGoBlockRecv
, 2)
600 // someone woke us up
601 if mysg
!= gp
.waiting
{
602 throw("G waiting list is corrupted")
605 gp
.activeStackChans
= false
606 if mysg
.releasetime
> 0 {
607 blockevent(mysg
.releasetime
-t0
, 2)
609 success
:= mysg
.success
616 // recv processes a receive operation on a full channel c.
617 // There are 2 parts:
618 // 1) The value sent by the sender sg is put into the channel
619 // and the sender is woken up to go on its merry way.
620 // 2) The value received by the receiver (the current G) is
622 // For synchronous channels, both values are the same.
623 // For asynchronous channels, the receiver gets its data from
624 // the channel buffer and the sender's data is put in the
626 // Channel c must be full and locked. recv unlocks c with unlockf.
627 // sg must already be dequeued from c.
628 // A non-nil ep must point to the heap or the caller's stack.
629 func recv(c
*hchan
, sg
*sudog
, ep unsafe
.Pointer
, unlockf
func(), skip
int) {
635 // copy data from sender
636 recvDirect(c
.elemtype
, sg
, ep
)
639 // Queue is full. Take the item at the
640 // head of the queue. Make the sender enqueue
641 // its item at the tail of the queue. Since the
642 // queue is full, those are both the same slot.
643 qp
:= chanbuf(c
, c
.recvx
)
645 racenotify(c
, c
.recvx
, nil)
646 racenotify(c
, c
.recvx
, sg
)
648 // copy data from queue to receiver
650 typedmemmove(c
.elemtype
, ep
, qp
)
652 // copy data from sender to queue
653 typedmemmove(c
.elemtype
, qp
, sg
.elem
)
655 if c
.recvx
== c
.dataqsiz
{
658 c
.sendx
= c
.recvx
// c.sendx = (c.sendx+1) % c.dataqsiz
663 gp
.param
= unsafe
.Pointer(sg
)
665 if sg
.releasetime
!= 0 {
666 sg
.releasetime
= cputicks()
671 func chanparkcommit(gp
*g
, chanLock unsafe
.Pointer
) bool {
672 // There are unlocked sudogs that point into gp's stack. Stack
673 // copying must lock the channels of those sudogs.
674 // Set activeStackChans here instead of before we try parking
675 // because we could self-deadlock in stack growth on the
677 gp
.activeStackChans
= true
678 // Mark that it's safe for stack shrinking to occur now,
679 // because any thread acquiring this G's stack for shrinking
680 // is guaranteed to observe activeStackChans after this store.
681 atomic
.Store8(&gp
.parkingOnChan
, 0)
682 // Make sure we unlock after setting activeStackChans and
683 // unsetting parkingOnChan. The moment we unlock chanLock
684 // we risk gp getting readied by a channel operation and
685 // so gp could continue running before everything before
686 // the unlock is visible (even to gp itself).
687 unlock((*mutex
)(chanLock
))
691 // compiler implements
702 // if selectnbsend(c, v) {
708 func selectnbsend(c
*hchan
, elem unsafe
.Pointer
) (selected
bool) {
709 return chansend(c
, elem
, false, getcallerpc())
712 // compiler implements
723 // if selected, ok = selectnbrecv(&v, c); selected {
729 func selectnbrecv(elem unsafe
.Pointer
, c
*hchan
) (selected
, received
bool) {
730 return chanrecv(c
, elem
, false)
733 //go:linkname reflect_chansend reflect.chansend
734 func reflect_chansend(c
*hchan
, elem unsafe
.Pointer
, nb
bool) (selected
bool) {
735 return chansend(c
, elem
, !nb
, getcallerpc())
738 //go:linkname reflect_chanrecv reflect.chanrecv
739 func reflect_chanrecv(c
*hchan
, nb
bool, elem unsafe
.Pointer
) (selected
bool, received
bool) {
740 return chanrecv(c
, elem
, !nb
)
743 //go:linkname reflect_chanlen reflect.chanlen
744 func reflect_chanlen(c
*hchan
) int {
751 //go:linkname reflectlite_chanlen internal_1reflectlite.chanlen
752 func reflectlite_chanlen(c
*hchan
) int {
759 //go:linkname reflect_chancap reflect.chancap
760 func reflect_chancap(c
*hchan
) int {
764 return int(c
.dataqsiz
)
767 //go:linkname reflect_chanclose reflect.chanclose
768 func reflect_chanclose(c
*hchan
) {
772 func (q
*waitq
) enqueue(sgp
*sudog
) {
786 func (q
*waitq
) dequeue() *sudog
{
799 sgp
.next
= nil // mark as removed (see dequeueSudog)
802 // if a goroutine was put on this queue because of a
803 // select, there is a small window between the goroutine
804 // being woken up by a different case and it grabbing the
805 // channel locks. Once it has the lock
806 // it removes itself from the queue, so we won't see it after that.
807 // We use a flag in the G struct to tell us when someone
808 // else has won the race to signal this goroutine but the goroutine
809 // hasn't removed itself from the queue yet.
810 if sgp
.isSelect
&& !atomic
.Cas(&sgp
.g
.selectDone
, 0, 1) {
818 func (c
*hchan
) raceaddr() unsafe
.Pointer
{
819 // Treat read-like and write-like operations on the channel to
820 // happen at this address. Avoid using the address of qcount
821 // or dataqsiz, because the len() and cap() builtins read
822 // those addresses, and we don't want them racing with
823 // operations like close().
824 return unsafe
.Pointer(&c
.buf
)
827 func racesync(c
*hchan
, sg
*sudog
) {
828 racerelease(chanbuf(c
, 0))
829 raceacquireg(sg
.g
, chanbuf(c
, 0))
830 racereleaseg(sg
.g
, chanbuf(c
, 0))
831 raceacquire(chanbuf(c
, 0))
834 // Notify the race detector of a send or receive involving buffer entry idx
835 // and a channel c or its communicating partner sg.
836 // This function handles the special case of c.elemsize==0.
837 func racenotify(c
*hchan
, idx
uint, sg
*sudog
) {
838 // We could have passed the unsafe.Pointer corresponding to entry idx
839 // instead of idx itself. However, in a future version of this function,
840 // we can use idx to better handle the case of elemsize==0.
841 // A future improvement to the detector is to call TSan with c and idx:
842 // this way, Go will continue to not allocating buffer entries for channels
843 // of elemsize==0, yet the race detector can be made to handle multiple
844 // sync objects underneath the hood (one sync object per idx)
845 qp
:= chanbuf(c
, idx
)
846 // When elemsize==0, we don't allocate a full buffer for the channel.
847 // Instead of individual buffer entries, the race detector uses the
848 // c.buf as the only buffer entry. This simplification prevents us from
849 // following the memory model's happens-before rules (rules that are
850 // implemented in racereleaseacquire). Instead, we accumulate happens-before
851 // information in the synchronization object associated with c.buf.
857 raceacquireg(sg
.g
, qp
)
858 racereleaseg(sg
.g
, qp
)
862 racereleaseacquire(qp
)
864 racereleaseacquireg(sg
.g
, qp
)