1 // Copyright 2009 The Go Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style
3 // license that can be found in the LICENSE file.
13 uint32 runtime_Hchansize = sizeof(Hchan);
15 static void dequeueg(WaitQ*);
16 static SudoG* dequeue(WaitQ*);
17 static void enqueue(WaitQ*, SudoG*);
18 static void racesync(Hchan*, SudoG*);
21 makechan(ChanType *t, int64 hint)
27 elem = t->__element_type;
29 // compiler checks this but be safe.
30 if(elem->__size >= (1<<16))
31 runtime_throw("makechan: invalid channel element type");
33 if(hint < 0 || (intgo)hint != hint || (elem->__size > 0 && (uintptr)hint > (MaxMem - sizeof(*c)) / elem->__size))
34 runtime_panicstring("makechan: size out of range");
37 n = ROUND(n, elem->__align);
39 // allocate memory in one call
40 c = (Hchan*)runtime_mallocgc(sizeof(*c) + hint*elem->__size, (uintptr)t | TypeInfo_Chan, 0);
41 c->elemsize = elem->__size;
46 runtime_printf("makechan: chan=%p; elemsize=%D; dataqsiz=%D\n",
47 c, (int64)elem->__size, (int64)c->dataqsiz);
52 func reflect.makechan(t *ChanType, size uint64) (c *Hchan) {
53 c = makechan(t, size);
57 __go_new_channel(ChanType *t, uintptr hint)
59 return makechan(t, hint);
63 __go_new_channel_big(ChanType *t, uint64 hint)
65 return makechan(t, hint);
69 * generic single channel send/recv
70 * if the bool pointer is nil,
71 * then the full exchange will
72 * occur. if pres is not nil,
73 * then the protocol will not
74 * sleep but return if it could
77 * sleep can wake up with g->param == nil
78 * when a channel involved in the sleep has
79 * been closed. it is easiest to loop and re-run
80 * the operation; we'll see that it's now closed.
83 chansend(ChanType *t, Hchan *c, byte *ep, bool block, void *pc)
94 runtime_racereadobjectpc(ep, t->__element_type, runtime_getcallerpc(&t), chansend);
100 runtime_park(nil, nil, "chan send (nil chan)");
101 return false; // not reached
104 if(runtime_gcwaiting())
108 runtime_printf("chansend: chan=%p\n", c);
112 mysg.releasetime = 0;
113 if(runtime_blockprofilerate > 0) {
114 t0 = runtime_cputicks();
115 mysg.releasetime = -1;
120 runtime_racereadpc(c, pc, chansend);
127 sg = dequeue(&c->recvq);
136 runtime_memmove(sg->elem, ep, c->elemsize);
138 sg->releasetime = runtime_cputicks();
150 mysg.selectdone = nil;
152 enqueue(&c->sendq, &mysg);
153 runtime_parkunlock(c, "chan send");
155 if(g->param == nil) {
158 runtime_throw("chansend: spurious wakeup");
162 if(mysg.releasetime > 0)
163 runtime_blockevent(mysg.releasetime - t0, 2);
171 if(c->qcount >= c->dataqsiz) {
178 mysg.selectdone = nil;
179 enqueue(&c->sendq, &mysg);
180 runtime_parkunlock(c, "chan send");
187 runtime_racerelease(chanbuf(c, c->sendx));
189 runtime_memmove(chanbuf(c, c->sendx), ep, c->elemsize);
190 if(++c->sendx == c->dataqsiz)
194 sg = dequeue(&c->recvq);
199 sg->releasetime = runtime_cputicks();
203 if(mysg.releasetime > 0)
204 runtime_blockevent(mysg.releasetime - t0, 2);
209 runtime_panicstring("send on closed channel");
210 return false; // not reached
215 chanrecv(ChanType *t, Hchan* c, byte *ep, bool block, bool *received)
223 if(runtime_gcwaiting())
226 // raceenabled: don't need to check ep, as it is always on the stack.
229 runtime_printf("chanrecv: chan=%p\n", c);
237 runtime_park(nil, nil, "chan receive (nil chan)");
238 return false; // not reached
242 mysg.releasetime = 0;
243 if(runtime_blockprofilerate > 0) {
244 t0 = runtime_cputicks();
245 mysg.releasetime = -1;
255 sg = dequeue(&c->sendq);
262 runtime_memmove(ep, sg->elem, c->elemsize);
266 sg->releasetime = runtime_cputicks();
281 mysg.selectdone = nil;
283 enqueue(&c->recvq, &mysg);
284 runtime_parkunlock(c, "chan receive");
286 if(g->param == nil) {
289 runtime_throw("chanrecv: spurious wakeup");
295 if(mysg.releasetime > 0)
296 runtime_blockevent(mysg.releasetime - t0, 2);
312 mysg.selectdone = nil;
313 enqueue(&c->recvq, &mysg);
314 runtime_parkunlock(c, "chan receive");
321 runtime_raceacquire(chanbuf(c, c->recvx));
324 runtime_memmove(ep, chanbuf(c, c->recvx), c->elemsize);
325 runtime_memclr(chanbuf(c, c->recvx), c->elemsize);
326 if(++c->recvx == c->dataqsiz)
330 sg = dequeue(&c->sendq);
335 sg->releasetime = runtime_cputicks();
342 if(mysg.releasetime > 0)
343 runtime_blockevent(mysg.releasetime - t0, 2);
348 runtime_memclr(ep, c->elemsize);
352 runtime_raceacquire(c);
354 if(mysg.releasetime > 0)
355 runtime_blockevent(mysg.releasetime - t0, 2);
359 // The compiler generates a call to __go_send_small to send a value 8
362 __go_send_small(ChanType *t, Hchan* c, uint64 val)
366 byte b[sizeof(uint64)];
372 #ifndef WORDS_BIGENDIAN
375 v = u.b + sizeof(uint64) - t->__element_type->__size;
377 chansend(t, c, v, true, runtime_getcallerpc(&t));
380 // The compiler generates a call to __go_send_big to send a value
381 // larger than 8 bytes or smaller.
383 __go_send_big(ChanType *t, Hchan* c, byte* v)
385 chansend(t, c, v, true, runtime_getcallerpc(&t));
388 // The compiler generates a call to __go_receive to receive a
389 // value from a channel.
391 __go_receive(ChanType *t, Hchan* c, byte* v)
393 chanrecv(t, c, v, true, nil);
396 _Bool runtime_chanrecv2(ChanType *t, Hchan* c, byte* v)
397 __asm__ (GOSYM_PREFIX "runtime.chanrecv2");
400 runtime_chanrecv2(ChanType *t, Hchan* c, byte* v)
402 bool received = false;
404 chanrecv(t, c, v, true, &received);
408 // compiler implements
419 // if selectnbsend(c, v) {
425 func selectnbsend(t *ChanType, c *Hchan, elem *byte) (selected bool) {
426 selected = chansend(t, c, elem, false, runtime_getcallerpc(&t));
429 // compiler implements
440 // if selectnbrecv(&v, c) {
446 func selectnbrecv(t *ChanType, elem *byte, c *Hchan) (selected bool) {
447 selected = chanrecv(t, c, elem, false, nil);
450 // compiler implements
461 // if c != nil && selectnbrecv2(&v, &ok, c) {
467 func selectnbrecv2(t *ChanType, elem *byte, received *bool, c *Hchan) (selected bool) {
470 selected = chanrecv(t, c, elem, false, received == nil ? nil : &r);
475 func reflect.chansend(t *ChanType, c *Hchan, elem *byte, nb bool) (selected bool) {
476 selected = chansend(t, c, elem, !nb, runtime_getcallerpc(&t));
479 func reflect.chanrecv(t *ChanType, c *Hchan, nb bool, elem *byte) (selected bool, received bool) {
481 selected = chanrecv(t, c, elem, !nb, &received);
484 static Select* newselect(int32);
486 func newselect(size int32) (sel *byte) {
487 sel = (byte*)newselect(size);
491 newselect(int32 size)
500 // allocate all the memory we need in a single allocation
501 // start with Select with size cases
502 // then lockorder with size entries
503 // then pollorder with size entries
504 sel = runtime_mal(sizeof(*sel) +
505 n*sizeof(sel->scase[0]) +
506 size*sizeof(sel->lockorder[0]) +
507 size*sizeof(sel->pollorder[0]));
511 sel->lockorder = (void*)(sel->scase + size);
512 sel->pollorder = (void*)(sel->lockorder + size);
515 runtime_printf("newselect s=%p size=%d\n", sel, size);
519 // cut in half to give stack a chance to split
520 static void selectsend(Select *sel, Hchan *c, int index, void *elem);
522 func selectsend(sel *Select, c *Hchan, elem *byte, index int32) {
523 // nil cases do not compete
525 selectsend(sel, c, index, elem);
529 selectsend(Select *sel, Hchan *c, int index, void *elem)
536 runtime_throw("selectsend: too many cases");
538 cas = &sel->scase[i];
542 cas->kind = CaseSend;
546 runtime_printf("selectsend s=%p index=%d chan=%p\n",
547 sel, cas->index, cas->chan);
550 // cut in half to give stack a chance to split
551 static void selectrecv(Select *sel, Hchan *c, int index, void *elem, bool*);
553 func selectrecv(sel *Select, c *Hchan, elem *byte, index int32) {
554 // nil cases do not compete
556 selectrecv(sel, c, index, elem, nil);
559 func selectrecv2(sel *Select, c *Hchan, elem *byte, received *bool, index int32) {
560 // nil cases do not compete
562 selectrecv(sel, c, index, elem, received);
566 selectrecv(Select *sel, Hchan *c, int index, void *elem, bool *received)
573 runtime_throw("selectrecv: too many cases");
575 cas = &sel->scase[i];
579 cas->kind = CaseRecv;
581 cas->receivedp = received;
584 runtime_printf("selectrecv s=%p index=%d chan=%p\n",
585 sel, cas->index, cas->chan);
588 // cut in half to give stack a chance to split
589 static void selectdefault(Select*, int);
591 func selectdefault(sel *Select, index int32) {
592 selectdefault(sel, index);
596 selectdefault(Select *sel, int32 index)
603 runtime_throw("selectdefault: too many cases");
605 cas = &sel->scase[i];
609 cas->kind = CaseDefault;
612 runtime_printf("selectdefault s=%p index=%d\n",
623 for(i=0; i<sel->ncase; i++) {
624 c0 = sel->lockorder[i];
626 c = sel->lockorder[i];
633 selunlock(Select *sel)
638 // We must be very careful here to not touch sel after we have unlocked
639 // the last lock, because sel can be freed right after the last unlock.
640 // Consider the following situation.
641 // First M calls runtime_park() in runtime_selectgo() passing the sel.
642 // Once runtime_park() has unlocked the last lock, another M makes
643 // the G that calls select runnable again and schedules it for execution.
644 // When the G runs on another M, it locks all the locks and frees sel.
645 // Now if the first M touches sel, it will access freed memory.
646 n = (int32)sel->ncase;
648 // skip the default case
649 if(n>0 && sel->lockorder[0] == nil)
651 for(i = n-1; i >= r; i--) {
652 c = sel->lockorder[i];
653 if(i>0 && sel->lockorder[i-1] == c)
654 continue; // will unlock it on the next iteration
660 selparkcommit(G *gp, void *sel)
668 runtime_park(nil, nil, "select (no cases)"); // forever
671 static int selectgo(Select**);
673 // selectgo(sel *byte);
675 func selectgo(sel *Select) (ret int32) {
676 return selectgo(&sel);
680 selectgo(Select **selp)
683 uint32 o, i, j, k, done;
693 if(runtime_gcwaiting())
697 runtime_printf("select: sel=%p\n", sel);
702 if(runtime_blockprofilerate > 0) {
703 t0 = runtime_cputicks();
704 for(i=0; i<sel->ncase; i++)
705 sel->scase[i].sg.releasetime = -1;
708 // The compiler rewrites selects that statically have
709 // only 0 or 1 cases plus default into simpler constructs.
710 // The only way we can end up with such small sel->ncase
711 // values here is for a larger select in which most channels
712 // have been nilled out. The general code handles those
713 // cases correctly, and they are rare enough not to bother
714 // optimizing (and needing to test).
716 // generate permuted order
717 for(i=0; i<sel->ncase; i++)
718 sel->pollorder[i] = i;
719 for(i=1; i<sel->ncase; i++) {
720 o = sel->pollorder[i];
721 j = runtime_fastrand1()%(i+1);
722 sel->pollorder[i] = sel->pollorder[j];
723 sel->pollorder[j] = o;
726 // sort the cases by Hchan address to get the locking order.
727 // simple heap sort, to guarantee n log n time and constant stack footprint.
728 for(i=0; i<sel->ncase; i++) {
730 c = sel->scase[j].chan;
731 while(j > 0 && sel->lockorder[k=(j-1)/2] < c) {
732 sel->lockorder[j] = sel->lockorder[k];
735 sel->lockorder[j] = c;
737 for(i=sel->ncase; i-->0; ) {
738 c = sel->lockorder[i];
739 sel->lockorder[i] = sel->lockorder[0];
745 if(k+1 < i && sel->lockorder[k] < sel->lockorder[k+1])
747 if(c < sel->lockorder[k]) {
748 sel->lockorder[j] = sel->lockorder[k];
754 sel->lockorder[j] = c;
757 for(i=0; i+1<sel->ncase; i++)
758 if(sel->lockorder[i] > sel->lockorder[i+1]) {
759 runtime_printf("i=%d %p %p\n", i, sel->lockorder[i], sel->lockorder[i+1]);
760 runtime_throw("select: broken sort");
766 // pass 1 - look for something already waiting
768 for(i=0; i<sel->ncase; i++) {
769 o = sel->pollorder[i];
770 cas = &sel->scase[o];
775 if(c->dataqsiz > 0) {
779 sg = dequeue(&c->sendq);
789 runtime_racereadpc(c, runtime_selectgo, chansend);
792 if(c->dataqsiz > 0) {
793 if(c->qcount < c->dataqsiz)
796 sg = dequeue(&c->recvq);
815 // pass 2 - enqueue on all chans
817 for(i=0; i<sel->ncase; i++) {
818 o = sel->pollorder[i];
819 cas = &sel->scase[o];
823 sg->selectdone = &done;
827 enqueue(&c->recvq, sg);
831 enqueue(&c->sendq, sg);
837 runtime_park(selparkcommit, sel, "select");
842 // pass 3 - dequeue from unsuccessful chans
843 // otherwise they stack up on quiet channels
844 for(i=0; i<sel->ncase; i++) {
845 cas = &sel->scase[i];
846 if(cas != (Scase*)sg) {
848 if(cas->kind == CaseSend)
862 runtime_throw("selectgo: shouldn't happen");
865 runtime_printf("wait-return: sel=%p c=%p cas=%p kind=%d\n",
866 sel, c, cas, cas->kind);
868 if(cas->kind == CaseRecv) {
869 if(cas->receivedp != nil)
870 *cas->receivedp = true;
874 if(cas->kind == CaseRecv && cas->sg.elem != nil)
875 runtime_racewriteobjectpc(cas->sg.elem, c->elemtype, selectgo, chanrecv);
876 else if(cas->kind == CaseSend)
877 runtime_racereadobjectpc(cas->sg.elem, c->elemtype, selectgo, chansend);
884 // can receive from buffer
886 if(cas->sg.elem != nil)
887 runtime_racewriteobjectpc(cas->sg.elem, c->elemtype, selectgo, chanrecv);
888 runtime_raceacquire(chanbuf(c, c->recvx));
890 if(cas->receivedp != nil)
891 *cas->receivedp = true;
892 if(cas->sg.elem != nil)
893 runtime_memmove(cas->sg.elem, chanbuf(c, c->recvx), c->elemsize);
894 runtime_memclr(chanbuf(c, c->recvx), c->elemsize);
895 if(++c->recvx == c->dataqsiz)
898 sg = dequeue(&c->sendq);
903 sg->releasetime = runtime_cputicks();
911 // can send to buffer
913 runtime_racerelease(chanbuf(c, c->sendx));
914 runtime_racereadobjectpc(cas->sg.elem, c->elemtype, selectgo, chansend);
916 runtime_memmove(chanbuf(c, c->sendx), cas->sg.elem, c->elemsize);
917 if(++c->sendx == c->dataqsiz)
920 sg = dequeue(&c->recvq);
925 sg->releasetime = runtime_cputicks();
933 // can receive from sleeping sender (sg)
935 if(cas->sg.elem != nil)
936 runtime_racewriteobjectpc(cas->sg.elem, c->elemtype, selectgo, chanrecv);
941 runtime_printf("syncrecv: sel=%p c=%p o=%d\n", sel, c, o);
942 if(cas->receivedp != nil)
943 *cas->receivedp = true;
944 if(cas->sg.elem != nil)
945 runtime_memmove(cas->sg.elem, sg->elem, c->elemsize);
949 sg->releasetime = runtime_cputicks();
954 // read at end of closed channel
956 if(cas->receivedp != nil)
957 *cas->receivedp = false;
958 if(cas->sg.elem != nil)
959 runtime_memclr(cas->sg.elem, c->elemsize);
961 runtime_raceacquire(c);
965 // can send to sleeping receiver (sg)
967 runtime_racereadobjectpc(cas->sg.elem, c->elemtype, selectgo, chansend);
972 runtime_printf("syncsend: sel=%p c=%p o=%d\n", sel, c, o);
974 runtime_memmove(sg->elem, cas->sg.elem, c->elemsize);
978 sg->releasetime = runtime_cputicks();
982 // return index corresponding to chosen case
984 if(cas->sg.releasetime > 0)
985 runtime_blockevent(cas->sg.releasetime - t0, 2);
990 // send on closed channel
992 runtime_panicstring("send on closed channel");
993 return 0; // not reached
996 // This struct must match ../reflect/value.go:/runtimeSelect.
997 typedef struct runtimeSelect runtimeSelect;
1006 // This enum must match ../reflect/value.go:/SelectDir.
1013 func reflect.rselect(cases Slice) (chosen int, recvOK bool) {
1016 runtimeSelect* rcase, *rc;
1021 rcase = (runtimeSelect*)cases.__values;
1023 sel = newselect(cases.__count);
1024 for(i=0; i<cases.__count; i++) {
1028 selectdefault(sel, i);
1033 selectsend(sel, rc->ch, i, rc->val);
1038 selectrecv(sel, rc->ch, i, rc->val, &recvOK);
1043 chosen = (intgo)(uintptr)selectgo(&sel);
1046 static void closechan(Hchan *c, void *pc);
1048 func closechan(c *Hchan) {
1049 closechan(c, runtime_getcallerpc(&c));
1052 func reflect.chanclose(c *Hchan) {
1053 closechan(c, runtime_getcallerpc(&c));
1057 closechan(Hchan *c, void *pc)
1063 runtime_panicstring("close of nil channel");
1065 if(runtime_gcwaiting())
1071 runtime_panicstring("close of closed channel");
1075 runtime_racewritepc(c, pc, runtime_closechan);
1076 runtime_racerelease(c);
1081 // release all readers
1083 sg = dequeue(&c->recvq);
1089 sg->releasetime = runtime_cputicks();
1093 // release all writers
1095 sg = dequeue(&c->sendq);
1101 sg->releasetime = runtime_cputicks();
1109 __go_builtin_close(Hchan *c)
1111 runtime_closechan(c);
1114 func reflect.chanlen(c *Hchan) (len int) {
1122 __go_chan_len(Hchan *c)
1124 return reflect_chanlen(c);
1127 func reflect.chancap(c *Hchan) (cap int) {
1135 __go_chan_cap(Hchan *c)
1137 return reflect_chancap(c);
1149 q->first = sgp->link;
1151 // if sgp participates in a select and is already signaled, ignore it
1152 if(sgp->selectdone != nil) {
1153 // claim the right to signal
1154 if(*sgp->selectdone != 0 || !runtime_cas(sgp->selectdone, 0, 1))
1164 SudoG **l, *sgp, *prevsgp;
1169 for(l=&q->first; (sgp=*l) != nil; l=&sgp->link, prevsgp=sgp) {
1180 enqueue(WaitQ *q, SudoG *sgp)
1183 if(q->first == nil) {
1188 q->last->link = sgp;
1193 racesync(Hchan *c, SudoG *sg)
1195 runtime_racerelease(chanbuf(c, 0));
1196 runtime_raceacquireg(sg->g, chanbuf(c, 0));
1197 runtime_racereleaseg(sg->g, chanbuf(c, 0));
1198 runtime_raceacquire(chanbuf(c, 0));