1 // Copyright 2009 The Go Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style
3 // license that can be found in the LICENSE file.
12 uint32 runtime_Hchansize = sizeof(Hchan);
14 static void dequeueg(WaitQ*);
15 static SudoG* dequeue(WaitQ*);
16 static void enqueue(WaitQ*, SudoG*);
19 makechan(ChanType *t, int64 hint)
25 elem = t->__element_type;
27 // compiler checks this but be safe.
28 if(elem->__size >= (1<<16))
29 runtime_throw("makechan: invalid channel element type");
31 if(hint < 0 || (intgo)hint != hint || (elem->__size > 0 && (uintptr)hint > (MaxMem - sizeof(*c)) / elem->__size))
32 runtime_panicstring("makechan: size out of range");
35 n = ROUND(n, elem->__align);
37 // allocate memory in one call
38 c = (Hchan*)runtime_mallocgc(sizeof(*c) + hint*elem->__size, (uintptr)t | TypeInfo_Chan, 0);
39 c->elemsize = elem->__size;
44 runtime_printf("makechan: chan=%p; elemsize=%D; dataqsiz=%D\n",
45 c, (int64)elem->__size, (int64)c->dataqsiz);
50 func reflect.makechan(t *ChanType, size uint64) (c *Hchan) {
51 c = makechan(t, size);
55 __go_new_channel(ChanType *t, uintptr hint)
57 return makechan(t, hint);
61 __go_new_channel_big(ChanType *t, uint64 hint)
63 return makechan(t, hint);
67 * generic single channel send/recv
68 * if the bool pointer is nil,
69 * then the full exchange will
70 * occur. if pres is not nil,
71 * then the protocol will not
72 * sleep but return if it could
75 * sleep can wake up with g->param == nil
76 * when a channel involved in the sleep has
77 * been closed. it is easiest to loop and re-run
78 * the operation; we'll see that it's now closed.
81 chansend(ChanType *t, Hchan *c, byte *ep, bool block, void *pc)
96 runtime_park(nil, nil, "chan send (nil chan)");
97 return false; // not reached
100 if(runtime_gcwaiting())
104 runtime_printf("chansend: chan=%p\n", c);
108 mysg.releasetime = 0;
109 if(runtime_blockprofilerate > 0) {
110 t0 = runtime_cputicks();
111 mysg.releasetime = -1;
121 sg = dequeue(&c->recvq);
128 runtime_memmove(sg->elem, ep, c->elemsize);
130 sg->releasetime = runtime_cputicks();
142 mysg.selectdone = nil;
144 enqueue(&c->sendq, &mysg);
145 runtime_parkunlock(c, "chan send");
147 if(g->param == nil) {
150 runtime_throw("chansend: spurious wakeup");
154 if(mysg.releasetime > 0)
155 runtime_blockevent(mysg.releasetime - t0, 2);
163 if(c->qcount >= c->dataqsiz) {
170 mysg.selectdone = nil;
171 enqueue(&c->sendq, &mysg);
172 runtime_parkunlock(c, "chan send");
178 runtime_memmove(chanbuf(c, c->sendx), ep, c->elemsize);
179 if(++c->sendx == c->dataqsiz)
183 sg = dequeue(&c->recvq);
188 sg->releasetime = runtime_cputicks();
192 if(mysg.releasetime > 0)
193 runtime_blockevent(mysg.releasetime - t0, 2);
198 runtime_panicstring("send on closed channel");
199 return false; // not reached
204 chanrecv(ChanType *t, Hchan* c, byte *ep, bool block, bool *received)
212 if(runtime_gcwaiting())
216 runtime_printf("chanrecv: chan=%p\n", c);
224 runtime_park(nil, nil, "chan receive (nil chan)");
225 return false; // not reached
229 mysg.releasetime = 0;
230 if(runtime_blockprofilerate > 0) {
231 t0 = runtime_cputicks();
232 mysg.releasetime = -1;
242 sg = dequeue(&c->sendq);
247 runtime_memmove(ep, sg->elem, c->elemsize);
251 sg->releasetime = runtime_cputicks();
266 mysg.selectdone = nil;
268 enqueue(&c->recvq, &mysg);
269 runtime_parkunlock(c, "chan receive");
271 if(g->param == nil) {
274 runtime_throw("chanrecv: spurious wakeup");
280 if(mysg.releasetime > 0)
281 runtime_blockevent(mysg.releasetime - t0, 2);
297 mysg.selectdone = nil;
298 enqueue(&c->recvq, &mysg);
299 runtime_parkunlock(c, "chan receive");
306 runtime_memmove(ep, chanbuf(c, c->recvx), c->elemsize);
307 runtime_memclr(chanbuf(c, c->recvx), c->elemsize);
308 if(++c->recvx == c->dataqsiz)
312 sg = dequeue(&c->sendq);
317 sg->releasetime = runtime_cputicks();
324 if(mysg.releasetime > 0)
325 runtime_blockevent(mysg.releasetime - t0, 2);
330 runtime_memclr(ep, c->elemsize);
334 if(mysg.releasetime > 0)
335 runtime_blockevent(mysg.releasetime - t0, 2);
339 // The compiler generates a call to __go_send_small to send a value 8
342 __go_send_small(ChanType *t, Hchan* c, uint64 val)
346 byte b[sizeof(uint64)];
352 #ifndef WORDS_BIGENDIAN
355 v = u.b + sizeof(uint64) - t->__element_type->__size;
357 chansend(t, c, v, true, runtime_getcallerpc(&t));
360 // The compiler generates a call to __go_send_big to send a value
361 // larger than 8 bytes or smaller.
363 __go_send_big(ChanType *t, Hchan* c, byte* v)
365 chansend(t, c, v, true, runtime_getcallerpc(&t));
368 // The compiler generates a call to __go_receive to receive a
369 // value from a channel.
371 __go_receive(ChanType *t, Hchan* c, byte* v)
373 chanrecv(t, c, v, true, nil);
376 _Bool runtime_chanrecv2(ChanType *t, Hchan* c, byte* v)
377 __asm__ (GOSYM_PREFIX "runtime.chanrecv2");
380 runtime_chanrecv2(ChanType *t, Hchan* c, byte* v)
382 bool received = false;
384 chanrecv(t, c, v, true, &received);
388 // compiler implements
399 // if selectnbsend(c, v) {
405 func selectnbsend(t *ChanType, c *Hchan, elem *byte) (selected bool) {
406 selected = chansend(t, c, elem, false, runtime_getcallerpc(&t));
409 // compiler implements
420 // if selectnbrecv(&v, c) {
426 func selectnbrecv(t *ChanType, elem *byte, c *Hchan) (selected bool) {
427 selected = chanrecv(t, c, elem, false, nil);
430 // compiler implements
441 // if c != nil && selectnbrecv2(&v, &ok, c) {
447 func selectnbrecv2(t *ChanType, elem *byte, received *bool, c *Hchan) (selected bool) {
450 selected = chanrecv(t, c, elem, false, received == nil ? nil : &r);
455 func reflect.chansend(t *ChanType, c *Hchan, elem *byte, nb bool) (selected bool) {
456 selected = chansend(t, c, elem, !nb, runtime_getcallerpc(&t));
459 func reflect.chanrecv(t *ChanType, c *Hchan, nb bool, elem *byte) (selected bool, received bool) {
461 selected = chanrecv(t, c, elem, !nb, &received);
464 static Select* newselect(int32);
466 func newselect(size int32) (sel *byte) {
467 sel = (byte*)newselect(size);
471 newselect(int32 size)
480 // allocate all the memory we need in a single allocation
481 // start with Select with size cases
482 // then lockorder with size entries
483 // then pollorder with size entries
484 sel = runtime_mal(sizeof(*sel) +
485 n*sizeof(sel->scase[0]) +
486 size*sizeof(sel->lockorder[0]) +
487 size*sizeof(sel->pollorder[0]));
491 sel->lockorder = (void*)(sel->scase + size);
492 sel->pollorder = (void*)(sel->lockorder + size);
495 runtime_printf("newselect s=%p size=%d\n", sel, size);
499 // cut in half to give stack a chance to split
500 static void selectsend(Select *sel, Hchan *c, int index, void *elem);
502 func selectsend(sel *Select, c *Hchan, elem *byte, index int32) {
503 // nil cases do not compete
505 selectsend(sel, c, index, elem);
509 selectsend(Select *sel, Hchan *c, int index, void *elem)
516 runtime_throw("selectsend: too many cases");
518 cas = &sel->scase[i];
522 cas->kind = CaseSend;
526 runtime_printf("selectsend s=%p index=%d chan=%p\n",
527 sel, cas->index, cas->chan);
530 // cut in half to give stack a chance to split
531 static void selectrecv(Select *sel, Hchan *c, int index, void *elem, bool*);
533 func selectrecv(sel *Select, c *Hchan, elem *byte, index int32) {
534 // nil cases do not compete
536 selectrecv(sel, c, index, elem, nil);
539 func selectrecv2(sel *Select, c *Hchan, elem *byte, received *bool, index int32) {
540 // nil cases do not compete
542 selectrecv(sel, c, index, elem, received);
546 selectrecv(Select *sel, Hchan *c, int index, void *elem, bool *received)
553 runtime_throw("selectrecv: too many cases");
555 cas = &sel->scase[i];
559 cas->kind = CaseRecv;
561 cas->receivedp = received;
564 runtime_printf("selectrecv s=%p index=%d chan=%p\n",
565 sel, cas->index, cas->chan);
568 // cut in half to give stack a chance to split
569 static void selectdefault(Select*, int);
571 func selectdefault(sel *Select, index int32) {
572 selectdefault(sel, index);
576 selectdefault(Select *sel, int32 index)
583 runtime_throw("selectdefault: too many cases");
585 cas = &sel->scase[i];
589 cas->kind = CaseDefault;
592 runtime_printf("selectdefault s=%p index=%d\n",
603 for(i=0; i<sel->ncase; i++) {
604 c0 = sel->lockorder[i];
606 c = sel->lockorder[i];
613 selunlock(Select *sel)
618 // We must be very careful here to not touch sel after we have unlocked
619 // the last lock, because sel can be freed right after the last unlock.
620 // Consider the following situation.
621 // First M calls runtime_park() in runtime_selectgo() passing the sel.
622 // Once runtime_park() has unlocked the last lock, another M makes
623 // the G that calls select runnable again and schedules it for execution.
624 // When the G runs on another M, it locks all the locks and frees sel.
625 // Now if the first M touches sel, it will access freed memory.
626 n = (int32)sel->ncase;
628 // skip the default case
629 if(n>0 && sel->lockorder[0] == nil)
631 for(i = n-1; i >= r; i--) {
632 c = sel->lockorder[i];
633 if(i>0 && sel->lockorder[i-1] == c)
634 continue; // will unlock it on the next iteration
640 selparkcommit(G *gp, void *sel)
648 runtime_park(nil, nil, "select (no cases)"); // forever
651 static int selectgo(Select**);
653 // selectgo(sel *byte);
655 func selectgo(sel *Select) (ret int32) {
656 return selectgo(&sel);
660 selectgo(Select **selp)
663 uint32 o, i, j, k, done;
673 if(runtime_gcwaiting())
677 runtime_printf("select: sel=%p\n", sel);
682 if(runtime_blockprofilerate > 0) {
683 t0 = runtime_cputicks();
684 for(i=0; i<sel->ncase; i++)
685 sel->scase[i].sg.releasetime = -1;
688 // The compiler rewrites selects that statically have
689 // only 0 or 1 cases plus default into simpler constructs.
690 // The only way we can end up with such small sel->ncase
691 // values here is for a larger select in which most channels
692 // have been nilled out. The general code handles those
693 // cases correctly, and they are rare enough not to bother
694 // optimizing (and needing to test).
696 // generate permuted order
697 for(i=0; i<sel->ncase; i++)
698 sel->pollorder[i] = i;
699 for(i=1; i<sel->ncase; i++) {
700 o = sel->pollorder[i];
701 j = runtime_fastrand1()%(i+1);
702 sel->pollorder[i] = sel->pollorder[j];
703 sel->pollorder[j] = o;
706 // sort the cases by Hchan address to get the locking order.
707 // simple heap sort, to guarantee n log n time and constant stack footprint.
708 for(i=0; i<sel->ncase; i++) {
710 c = sel->scase[j].chan;
711 while(j > 0 && sel->lockorder[k=(j-1)/2] < c) {
712 sel->lockorder[j] = sel->lockorder[k];
715 sel->lockorder[j] = c;
717 for(i=sel->ncase; i-->0; ) {
718 c = sel->lockorder[i];
719 sel->lockorder[i] = sel->lockorder[0];
725 if(k+1 < i && sel->lockorder[k] < sel->lockorder[k+1])
727 if(c < sel->lockorder[k]) {
728 sel->lockorder[j] = sel->lockorder[k];
734 sel->lockorder[j] = c;
737 for(i=0; i+1<sel->ncase; i++)
738 if(sel->lockorder[i] > sel->lockorder[i+1]) {
739 runtime_printf("i=%d %p %p\n", i, sel->lockorder[i], sel->lockorder[i+1]);
740 runtime_throw("select: broken sort");
746 // pass 1 - look for something already waiting
748 for(i=0; i<sel->ncase; i++) {
749 o = sel->pollorder[i];
750 cas = &sel->scase[o];
755 if(c->dataqsiz > 0) {
759 sg = dequeue(&c->sendq);
770 if(c->dataqsiz > 0) {
771 if(c->qcount < c->dataqsiz)
774 sg = dequeue(&c->recvq);
793 // pass 2 - enqueue on all chans
795 for(i=0; i<sel->ncase; i++) {
796 o = sel->pollorder[i];
797 cas = &sel->scase[o];
801 sg->selectdone = &done;
805 enqueue(&c->recvq, sg);
809 enqueue(&c->sendq, sg);
815 runtime_park(selparkcommit, sel, "select");
820 // pass 3 - dequeue from unsuccessful chans
821 // otherwise they stack up on quiet channels
822 for(i=0; i<sel->ncase; i++) {
823 cas = &sel->scase[i];
824 if(cas != (Scase*)sg) {
826 if(cas->kind == CaseSend)
840 runtime_throw("selectgo: shouldn't happen");
843 runtime_printf("wait-return: sel=%p c=%p cas=%p kind=%d\n",
844 sel, c, cas, cas->kind);
846 if(cas->kind == CaseRecv) {
847 if(cas->receivedp != nil)
848 *cas->receivedp = true;
855 // can receive from buffer
856 if(cas->receivedp != nil)
857 *cas->receivedp = true;
858 if(cas->sg.elem != nil)
859 runtime_memmove(cas->sg.elem, chanbuf(c, c->recvx), c->elemsize);
860 runtime_memclr(chanbuf(c, c->recvx), c->elemsize);
861 if(++c->recvx == c->dataqsiz)
864 sg = dequeue(&c->sendq);
869 sg->releasetime = runtime_cputicks();
877 // can send to buffer
878 runtime_memmove(chanbuf(c, c->sendx), cas->sg.elem, c->elemsize);
879 if(++c->sendx == c->dataqsiz)
882 sg = dequeue(&c->recvq);
887 sg->releasetime = runtime_cputicks();
895 // can receive from sleeping sender (sg)
898 runtime_printf("syncrecv: sel=%p c=%p o=%d\n", sel, c, o);
899 if(cas->receivedp != nil)
900 *cas->receivedp = true;
901 if(cas->sg.elem != nil)
902 runtime_memmove(cas->sg.elem, sg->elem, c->elemsize);
906 sg->releasetime = runtime_cputicks();
911 // read at end of closed channel
913 if(cas->receivedp != nil)
914 *cas->receivedp = false;
915 if(cas->sg.elem != nil)
916 runtime_memclr(cas->sg.elem, c->elemsize);
920 // can send to sleeping receiver (sg)
923 runtime_printf("syncsend: sel=%p c=%p o=%d\n", sel, c, o);
925 runtime_memmove(sg->elem, cas->sg.elem, c->elemsize);
929 sg->releasetime = runtime_cputicks();
933 // return index corresponding to chosen case
935 if(cas->sg.releasetime > 0)
936 runtime_blockevent(cas->sg.releasetime - t0, 2);
941 // send on closed channel
943 runtime_panicstring("send on closed channel");
944 return 0; // not reached
947 // This struct must match ../reflect/value.go:/runtimeSelect.
948 typedef struct runtimeSelect runtimeSelect;
957 // This enum must match ../reflect/value.go:/SelectDir.
964 func reflect.rselect(cases Slice) (chosen int, recvOK bool) {
967 runtimeSelect* rcase, *rc;
972 rcase = (runtimeSelect*)cases.__values;
974 sel = newselect(cases.__count);
975 for(i=0; i<cases.__count; i++) {
979 selectdefault(sel, i);
984 selectsend(sel, rc->ch, i, rc->val);
989 selectrecv(sel, rc->ch, i, rc->val, &recvOK);
994 chosen = (intgo)(uintptr)selectgo(&sel);
997 static void closechan(Hchan *c, void *pc);
999 func closechan(c *Hchan) {
1000 closechan(c, runtime_getcallerpc(&c));
1003 func reflect.chanclose(c *Hchan) {
1004 closechan(c, runtime_getcallerpc(&c));
1008 closechan(Hchan *c, void *pc)
1015 runtime_panicstring("close of nil channel");
1017 if(runtime_gcwaiting())
1023 runtime_panicstring("close of closed channel");
1027 // release all readers
1029 sg = dequeue(&c->recvq);
1035 sg->releasetime = runtime_cputicks();
1039 // release all writers
1041 sg = dequeue(&c->sendq);
1047 sg->releasetime = runtime_cputicks();
1055 __go_builtin_close(Hchan *c)
1057 runtime_closechan(c);
1060 func reflect.chanlen(c *Hchan) (len int) {
1068 __go_chan_len(Hchan *c)
1070 return reflect_chanlen(c);
1073 func reflect.chancap(c *Hchan) (cap int) {
1081 __go_chan_cap(Hchan *c)
1083 return reflect_chancap(c);
1095 q->first = sgp->link;
1097 // if sgp participates in a select and is already signaled, ignore it
1098 if(sgp->selectdone != nil) {
1099 // claim the right to signal
1100 if(*sgp->selectdone != 0 || !runtime_cas(sgp->selectdone, 0, 1))
1110 SudoG **l, *sgp, *prevsgp;
1115 for(l=&q->first; (sgp=*l) != nil; l=&sgp->link, prevsgp=sgp) {
1126 enqueue(WaitQ *q, SudoG *sgp)
1129 if(q->first == nil) {
1134 q->last->link = sgp;