1 // Copyright 2009 The Go Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style
3 // license that can be found in the LICENSE file.
13 uint32 runtime_Hchansize = sizeof(Hchan);
15 static void dequeueg(WaitQ*);
16 static SudoG* dequeue(WaitQ*);
17 static void enqueue(WaitQ*, SudoG*);
18 static void racesync(Hchan*, SudoG*);
21 makechan(ChanType *t, int64 hint)
27 elem = t->__element_type;
29 // compiler checks this but be safe.
30 if(elem->__size >= (1<<16))
31 runtime_throw("makechan: invalid channel element type");
33 if(hint < 0 || (intgo)hint != hint || (elem->__size > 0 && (uintptr)hint > (MaxMem - sizeof(*c)) / elem->__size))
34 runtime_panicstring("makechan: size out of range");
37 n = ROUND(n, elem->__align);
39 // allocate memory in one call
40 c = (Hchan*)runtime_mallocgc(sizeof(*c) + hint*elem->__size, (uintptr)t | TypeInfo_Chan, 0);
41 c->elemsize = elem->__size;
46 runtime_printf("makechan: chan=%p; elemsize=%D; dataqsiz=%D\n",
47 c, (int64)elem->__size, (int64)c->dataqsiz);
52 func reflect.makechan(t *ChanType, size uint64) (c *Hchan) {
53 c = makechan(t, size);
57 __go_new_channel(ChanType *t, uintptr hint)
59 return makechan(t, hint);
63 __go_new_channel_big(ChanType *t, uint64 hint)
65 return makechan(t, hint);
69 * generic single channel send/recv
70 * if the bool pointer is nil,
71 * then the full exchange will
72 * occur. if pres is not nil,
73 * then the protocol will not
74 * sleep but return if it could
77 * sleep can wake up with g->param == nil
78 * when a channel involved in the sleep has
79 * been closed. it is easiest to loop and re-run
80 * the operation; we'll see that it's now closed.
83 chansend(ChanType *t, Hchan *c, byte *ep, bool block, void *pc)
94 runtime_racereadobjectpc(ep, t->__element_type, runtime_getcallerpc(&t), chansend);
100 runtime_park(nil, nil, "chan send (nil chan)");
101 return false; // not reached
104 if(runtime_gcwaiting())
108 runtime_printf("chansend: chan=%p\n", c);
112 mysg.releasetime = 0;
113 if(runtime_blockprofilerate > 0) {
114 t0 = runtime_cputicks();
115 mysg.releasetime = -1;
120 runtime_racereadpc(c, pc, chansend);
127 sg = dequeue(&c->recvq);
136 runtime_memmove(sg->elem, ep, c->elemsize);
138 sg->releasetime = runtime_cputicks();
150 mysg.selectdone = nil;
152 enqueue(&c->sendq, &mysg);
153 runtime_parkunlock(c, "chan send");
155 if(g->param == nil) {
158 runtime_throw("chansend: spurious wakeup");
162 if(mysg.releasetime > 0)
163 runtime_blockevent(mysg.releasetime - t0, 2);
171 if(c->qcount >= c->dataqsiz) {
178 mysg.selectdone = nil;
179 enqueue(&c->sendq, &mysg);
180 runtime_parkunlock(c, "chan send");
187 runtime_raceacquire(chanbuf(c, c->sendx));
188 runtime_racerelease(chanbuf(c, c->sendx));
191 runtime_memmove(chanbuf(c, c->sendx), ep, c->elemsize);
192 if(++c->sendx == c->dataqsiz)
196 sg = dequeue(&c->recvq);
201 sg->releasetime = runtime_cputicks();
205 if(mysg.releasetime > 0)
206 runtime_blockevent(mysg.releasetime - t0, 2);
211 runtime_panicstring("send on closed channel");
212 return false; // not reached
217 chanrecv(ChanType *t, Hchan* c, byte *ep, bool block, bool *received)
225 if(runtime_gcwaiting())
228 // raceenabled: don't need to check ep, as it is always on the stack.
231 runtime_printf("chanrecv: chan=%p\n", c);
239 runtime_park(nil, nil, "chan receive (nil chan)");
240 return false; // not reached
244 mysg.releasetime = 0;
245 if(runtime_blockprofilerate > 0) {
246 t0 = runtime_cputicks();
247 mysg.releasetime = -1;
257 sg = dequeue(&c->sendq);
264 runtime_memmove(ep, sg->elem, c->elemsize);
268 sg->releasetime = runtime_cputicks();
283 mysg.selectdone = nil;
285 enqueue(&c->recvq, &mysg);
286 runtime_parkunlock(c, "chan receive");
288 if(g->param == nil) {
291 runtime_throw("chanrecv: spurious wakeup");
297 if(mysg.releasetime > 0)
298 runtime_blockevent(mysg.releasetime - t0, 2);
314 mysg.selectdone = nil;
315 enqueue(&c->recvq, &mysg);
316 runtime_parkunlock(c, "chan receive");
323 runtime_raceacquire(chanbuf(c, c->recvx));
324 runtime_racerelease(chanbuf(c, c->recvx));
328 runtime_memmove(ep, chanbuf(c, c->recvx), c->elemsize);
329 runtime_memclr(chanbuf(c, c->recvx), c->elemsize);
330 if(++c->recvx == c->dataqsiz)
334 sg = dequeue(&c->sendq);
339 sg->releasetime = runtime_cputicks();
346 if(mysg.releasetime > 0)
347 runtime_blockevent(mysg.releasetime - t0, 2);
352 runtime_memclr(ep, c->elemsize);
356 runtime_raceacquire(c);
358 if(mysg.releasetime > 0)
359 runtime_blockevent(mysg.releasetime - t0, 2);
363 // The compiler generates a call to __go_send_small to send a value 8
366 __go_send_small(ChanType *t, Hchan* c, uint64 val)
370 byte b[sizeof(uint64)];
376 #ifndef WORDS_BIGENDIAN
379 v = u.b + sizeof(uint64) - t->__element_type->__size;
381 chansend(t, c, v, true, runtime_getcallerpc(&t));
384 // The compiler generates a call to __go_send_big to send a value
385 // larger than 8 bytes or smaller.
387 __go_send_big(ChanType *t, Hchan* c, byte* v)
389 chansend(t, c, v, true, runtime_getcallerpc(&t));
392 // The compiler generates a call to __go_receive to receive a
393 // value from a channel.
395 __go_receive(ChanType *t, Hchan* c, byte* v)
397 chanrecv(t, c, v, true, nil);
400 _Bool runtime_chanrecv2(ChanType *t, Hchan* c, byte* v)
401 __asm__ (GOSYM_PREFIX "runtime.chanrecv2");
404 runtime_chanrecv2(ChanType *t, Hchan* c, byte* v)
406 bool received = false;
408 chanrecv(t, c, v, true, &received);
412 // compiler implements
423 // if selectnbsend(c, v) {
429 func selectnbsend(t *ChanType, c *Hchan, elem *byte) (selected bool) {
430 selected = chansend(t, c, elem, false, runtime_getcallerpc(&t));
433 // compiler implements
444 // if selectnbrecv(&v, c) {
450 func selectnbrecv(t *ChanType, elem *byte, c *Hchan) (selected bool) {
451 selected = chanrecv(t, c, elem, false, nil);
454 // compiler implements
465 // if c != nil && selectnbrecv2(&v, &ok, c) {
471 func selectnbrecv2(t *ChanType, elem *byte, received *bool, c *Hchan) (selected bool) {
474 selected = chanrecv(t, c, elem, false, received == nil ? nil : &r);
479 func reflect.chansend(t *ChanType, c *Hchan, elem *byte, nb bool) (selected bool) {
480 selected = chansend(t, c, elem, !nb, runtime_getcallerpc(&t));
483 func reflect.chanrecv(t *ChanType, c *Hchan, nb bool, elem *byte) (selected bool, received bool) {
485 selected = chanrecv(t, c, elem, !nb, &received);
488 static Select* newselect(int32);
490 func newselect(size int32) (sel *byte) {
491 sel = (byte*)newselect(size);
495 newselect(int32 size)
504 // allocate all the memory we need in a single allocation
505 // start with Select with size cases
506 // then lockorder with size entries
507 // then pollorder with size entries
508 sel = runtime_mal(sizeof(*sel) +
509 n*sizeof(sel->scase[0]) +
510 size*sizeof(sel->lockorder[0]) +
511 size*sizeof(sel->pollorder[0]));
515 sel->lockorder = (void*)(sel->scase + size);
516 sel->pollorder = (void*)(sel->lockorder + size);
519 runtime_printf("newselect s=%p size=%d\n", sel, size);
523 // cut in half to give stack a chance to split
524 static void selectsend(Select *sel, Hchan *c, int index, void *elem);
526 func selectsend(sel *Select, c *Hchan, elem *byte, index int32) {
527 // nil cases do not compete
529 selectsend(sel, c, index, elem);
533 selectsend(Select *sel, Hchan *c, int index, void *elem)
540 runtime_throw("selectsend: too many cases");
542 cas = &sel->scase[i];
546 cas->kind = CaseSend;
550 runtime_printf("selectsend s=%p index=%d chan=%p\n",
551 sel, cas->index, cas->chan);
554 // cut in half to give stack a chance to split
555 static void selectrecv(Select *sel, Hchan *c, int index, void *elem, bool*);
557 func selectrecv(sel *Select, c *Hchan, elem *byte, index int32) {
558 // nil cases do not compete
560 selectrecv(sel, c, index, elem, nil);
563 func selectrecv2(sel *Select, c *Hchan, elem *byte, received *bool, index int32) {
564 // nil cases do not compete
566 selectrecv(sel, c, index, elem, received);
570 selectrecv(Select *sel, Hchan *c, int index, void *elem, bool *received)
577 runtime_throw("selectrecv: too many cases");
579 cas = &sel->scase[i];
583 cas->kind = CaseRecv;
585 cas->receivedp = received;
588 runtime_printf("selectrecv s=%p index=%d chan=%p\n",
589 sel, cas->index, cas->chan);
592 // cut in half to give stack a chance to split
593 static void selectdefault(Select*, int);
595 func selectdefault(sel *Select, index int32) {
596 selectdefault(sel, index);
600 selectdefault(Select *sel, int32 index)
607 runtime_throw("selectdefault: too many cases");
609 cas = &sel->scase[i];
613 cas->kind = CaseDefault;
616 runtime_printf("selectdefault s=%p index=%d\n",
627 for(i=0; i<sel->ncase; i++) {
628 c0 = sel->lockorder[i];
630 c = sel->lockorder[i];
637 selunlock(Select *sel)
642 // We must be very careful here to not touch sel after we have unlocked
643 // the last lock, because sel can be freed right after the last unlock.
644 // Consider the following situation.
645 // First M calls runtime_park() in runtime_selectgo() passing the sel.
646 // Once runtime_park() has unlocked the last lock, another M makes
647 // the G that calls select runnable again and schedules it for execution.
648 // When the G runs on another M, it locks all the locks and frees sel.
649 // Now if the first M touches sel, it will access freed memory.
650 n = (int32)sel->ncase;
652 // skip the default case
653 if(n>0 && sel->lockorder[0] == nil)
655 for(i = n-1; i >= r; i--) {
656 c = sel->lockorder[i];
657 if(i>0 && sel->lockorder[i-1] == c)
658 continue; // will unlock it on the next iteration
664 selparkcommit(G *gp, void *sel)
672 runtime_park(nil, nil, "select (no cases)"); // forever
675 static int selectgo(Select**);
677 // selectgo(sel *byte);
679 func selectgo(sel *Select) (ret int32) {
680 return selectgo(&sel);
684 selectgo(Select **selp)
687 uint32 o, i, j, k, done;
697 if(runtime_gcwaiting())
701 runtime_printf("select: sel=%p\n", sel);
706 if(runtime_blockprofilerate > 0) {
707 t0 = runtime_cputicks();
708 for(i=0; i<sel->ncase; i++)
709 sel->scase[i].sg.releasetime = -1;
712 // The compiler rewrites selects that statically have
713 // only 0 or 1 cases plus default into simpler constructs.
714 // The only way we can end up with such small sel->ncase
715 // values here is for a larger select in which most channels
716 // have been nilled out. The general code handles those
717 // cases correctly, and they are rare enough not to bother
718 // optimizing (and needing to test).
720 // generate permuted order
721 for(i=0; i<sel->ncase; i++)
722 sel->pollorder[i] = i;
723 for(i=1; i<sel->ncase; i++) {
724 o = sel->pollorder[i];
725 j = runtime_fastrand1()%(i+1);
726 sel->pollorder[i] = sel->pollorder[j];
727 sel->pollorder[j] = o;
730 // sort the cases by Hchan address to get the locking order.
731 // simple heap sort, to guarantee n log n time and constant stack footprint.
732 for(i=0; i<sel->ncase; i++) {
734 c = sel->scase[j].chan;
735 while(j > 0 && sel->lockorder[k=(j-1)/2] < c) {
736 sel->lockorder[j] = sel->lockorder[k];
739 sel->lockorder[j] = c;
741 for(i=sel->ncase; i-->0; ) {
742 c = sel->lockorder[i];
743 sel->lockorder[i] = sel->lockorder[0];
749 if(k+1 < i && sel->lockorder[k] < sel->lockorder[k+1])
751 if(c < sel->lockorder[k]) {
752 sel->lockorder[j] = sel->lockorder[k];
758 sel->lockorder[j] = c;
761 for(i=0; i+1<sel->ncase; i++)
762 if(sel->lockorder[i] > sel->lockorder[i+1]) {
763 runtime_printf("i=%d %p %p\n", i, sel->lockorder[i], sel->lockorder[i+1]);
764 runtime_throw("select: broken sort");
770 // pass 1 - look for something already waiting
772 for(i=0; i<sel->ncase; i++) {
773 o = sel->pollorder[i];
774 cas = &sel->scase[o];
779 if(c->dataqsiz > 0) {
783 sg = dequeue(&c->sendq);
793 runtime_racereadpc(c, runtime_selectgo, chansend);
796 if(c->dataqsiz > 0) {
797 if(c->qcount < c->dataqsiz)
800 sg = dequeue(&c->recvq);
819 // pass 2 - enqueue on all chans
821 for(i=0; i<sel->ncase; i++) {
822 o = sel->pollorder[i];
823 cas = &sel->scase[o];
827 sg->selectdone = &done;
831 enqueue(&c->recvq, sg);
835 enqueue(&c->sendq, sg);
841 runtime_park(selparkcommit, sel, "select");
846 // pass 3 - dequeue from unsuccessful chans
847 // otherwise they stack up on quiet channels
848 for(i=0; i<sel->ncase; i++) {
849 cas = &sel->scase[i];
850 if(cas != (Scase*)sg) {
852 if(cas->kind == CaseSend)
866 runtime_throw("selectgo: shouldn't happen");
869 runtime_printf("wait-return: sel=%p c=%p cas=%p kind=%d\n",
870 sel, c, cas, cas->kind);
872 if(cas->kind == CaseRecv) {
873 if(cas->receivedp != nil)
874 *cas->receivedp = true;
878 if(cas->kind == CaseRecv && cas->sg.elem != nil)
879 runtime_racewriteobjectpc(cas->sg.elem, c->elemtype, selectgo, chanrecv);
880 else if(cas->kind == CaseSend)
881 runtime_racereadobjectpc(cas->sg.elem, c->elemtype, selectgo, chansend);
888 // can receive from buffer
890 if(cas->sg.elem != nil)
891 runtime_racewriteobjectpc(cas->sg.elem, c->elemtype, selectgo, chanrecv);
892 runtime_raceacquire(chanbuf(c, c->recvx));
893 runtime_racerelease(chanbuf(c, c->recvx));
895 if(cas->receivedp != nil)
896 *cas->receivedp = true;
897 if(cas->sg.elem != nil)
898 runtime_memmove(cas->sg.elem, chanbuf(c, c->recvx), c->elemsize);
899 runtime_memclr(chanbuf(c, c->recvx), c->elemsize);
900 if(++c->recvx == c->dataqsiz)
903 sg = dequeue(&c->sendq);
908 sg->releasetime = runtime_cputicks();
916 // can send to buffer
918 runtime_raceacquire(chanbuf(c, c->sendx));
919 runtime_racerelease(chanbuf(c, c->sendx));
920 runtime_racereadobjectpc(cas->sg.elem, c->elemtype, selectgo, chansend);
922 runtime_memmove(chanbuf(c, c->sendx), cas->sg.elem, c->elemsize);
923 if(++c->sendx == c->dataqsiz)
926 sg = dequeue(&c->recvq);
931 sg->releasetime = runtime_cputicks();
939 // can receive from sleeping sender (sg)
941 if(cas->sg.elem != nil)
942 runtime_racewriteobjectpc(cas->sg.elem, c->elemtype, selectgo, chanrecv);
947 runtime_printf("syncrecv: sel=%p c=%p o=%d\n", sel, c, o);
948 if(cas->receivedp != nil)
949 *cas->receivedp = true;
950 if(cas->sg.elem != nil)
951 runtime_memmove(cas->sg.elem, sg->elem, c->elemsize);
955 sg->releasetime = runtime_cputicks();
960 // read at end of closed channel
962 if(cas->receivedp != nil)
963 *cas->receivedp = false;
964 if(cas->sg.elem != nil)
965 runtime_memclr(cas->sg.elem, c->elemsize);
967 runtime_raceacquire(c);
971 // can send to sleeping receiver (sg)
973 runtime_racereadobjectpc(cas->sg.elem, c->elemtype, selectgo, chansend);
978 runtime_printf("syncsend: sel=%p c=%p o=%d\n", sel, c, o);
980 runtime_memmove(sg->elem, cas->sg.elem, c->elemsize);
984 sg->releasetime = runtime_cputicks();
988 // return index corresponding to chosen case
990 if(cas->sg.releasetime > 0)
991 runtime_blockevent(cas->sg.releasetime - t0, 2);
996 // send on closed channel
998 runtime_panicstring("send on closed channel");
999 return 0; // not reached
1002 // This struct must match ../reflect/value.go:/runtimeSelect.
1003 typedef struct runtimeSelect runtimeSelect;
1004 struct runtimeSelect
1012 // This enum must match ../reflect/value.go:/SelectDir.
1019 func reflect.rselect(cases Slice) (chosen int, recvOK bool) {
1022 runtimeSelect* rcase, *rc;
1027 rcase = (runtimeSelect*)cases.__values;
1029 sel = newselect(cases.__count);
1030 for(i=0; i<cases.__count; i++) {
1034 selectdefault(sel, i);
1039 selectsend(sel, rc->ch, i, rc->val);
1044 selectrecv(sel, rc->ch, i, rc->val, &recvOK);
1049 chosen = (intgo)(uintptr)selectgo(&sel);
1052 static void closechan(Hchan *c, void *pc);
1054 func closechan(c *Hchan) {
1055 closechan(c, runtime_getcallerpc(&c));
1058 func reflect.chanclose(c *Hchan) {
1059 closechan(c, runtime_getcallerpc(&c));
1063 closechan(Hchan *c, void *pc)
1069 runtime_panicstring("close of nil channel");
1071 if(runtime_gcwaiting())
1077 runtime_panicstring("close of closed channel");
1081 runtime_racewritepc(c, pc, runtime_closechan);
1082 runtime_racerelease(c);
1087 // release all readers
1089 sg = dequeue(&c->recvq);
1095 sg->releasetime = runtime_cputicks();
1099 // release all writers
1101 sg = dequeue(&c->sendq);
1107 sg->releasetime = runtime_cputicks();
1115 __go_builtin_close(Hchan *c)
1117 runtime_closechan(c);
1120 func reflect.chanlen(c *Hchan) (len int) {
1128 __go_chan_len(Hchan *c)
1130 return reflect_chanlen(c);
1133 func reflect.chancap(c *Hchan) (cap int) {
1141 __go_chan_cap(Hchan *c)
1143 return reflect_chancap(c);
1155 q->first = sgp->link;
1157 // if sgp participates in a select and is already signaled, ignore it
1158 if(sgp->selectdone != nil) {
1159 // claim the right to signal
1160 if(*sgp->selectdone != 0 || !runtime_cas(sgp->selectdone, 0, 1))
1170 SudoG **l, *sgp, *prevsgp;
1175 for(l=&q->first; (sgp=*l) != nil; l=&sgp->link, prevsgp=sgp) {
1186 enqueue(WaitQ *q, SudoG *sgp)
1189 if(q->first == nil) {
1194 q->last->link = sgp;
1199 racesync(Hchan *c, SudoG *sg)
1201 runtime_racerelease(chanbuf(c, 0));
1202 runtime_raceacquireg(sg->g, chanbuf(c, 0));
1203 runtime_racereleaseg(sg->g, chanbuf(c, 0));
1204 runtime_raceacquire(chanbuf(c, 0));