1 /* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
2 /* This Source Code Form is subject to the terms of the Mozilla Public
3 * License, v. 2.0. If a copy of the MPL was not distributed with this
4 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
9 #define _MW_REHASH_MAX 11
11 static PRLock
*mw_lock
= NULL
;
12 static _PRGlobalState
*mw_state
= NULL
;
14 static PRIntervalTime max_polling_interval
;
18 typedef struct TimerEvent
{
19 PRIntervalTime absolute
;
26 #define TIMER_EVENT_PTR(_qp) \
27 ((TimerEvent *) ((char *) (_qp) - offsetof(TimerEvent, links)))
32 PRCondVar
*cancel_timer
;
33 PRThread
*manager_thread
;
37 static PRStatus
TimerInit(void);
38 static void TimerManager(void *arg
);
39 static TimerEvent
*CreateTimer(PRIntervalTime timeout
,
40 void (*func
)(void *), void *arg
);
41 static PRBool
CancelTimer(TimerEvent
*timer
);
43 static void TimerManager(void *arg
)
46 PRIntervalTime timeout
;
53 if (PR_CLIST_IS_EMPTY(&tm_vars
.timer_queue
))
55 PR_WaitCondVar(tm_vars
.new_timer
, PR_INTERVAL_NO_TIMEOUT
);
59 now
= PR_IntervalNow();
60 head
= PR_LIST_HEAD(&tm_vars
.timer_queue
);
61 timer
= TIMER_EVENT_PTR(head
);
62 if ((PRInt32
) (now
- timer
->absolute
) >= 0)
66 * make its prev and next point to itself so that
67 * it's obvious that it's not on the timer_queue.
70 PR_ASSERT(2 == timer
->ref_count
);
71 PR_Unlock(tm_vars
.ml
);
72 timer
->func(timer
->arg
);
74 timer
->ref_count
-= 1;
75 if (0 == timer
->ref_count
)
77 PR_NotifyAllCondVar(tm_vars
.cancel_timer
);
82 timeout
= (PRIntervalTime
)(timer
->absolute
- now
);
83 PR_WaitCondVar(tm_vars
.new_timer
, timeout
);
87 PR_Unlock(tm_vars
.ml
);
90 static TimerEvent
*CreateTimer(
91 PRIntervalTime timeout
,
96 PRCList
*links
, *tail
;
99 timer
= PR_NEW(TimerEvent
);
102 PR_SetError(PR_OUT_OF_MEMORY_ERROR
, 0);
105 timer
->absolute
= PR_IntervalNow() + timeout
;
108 timer
->ref_count
= 2;
110 tail
= links
= PR_LIST_TAIL(&tm_vars
.timer_queue
);
111 while (links
->prev
!= tail
)
113 elem
= TIMER_EVENT_PTR(links
);
114 if ((PRInt32
)(timer
->absolute
- elem
->absolute
) >= 0)
120 PR_INSERT_AFTER(&timer
->links
, links
);
121 PR_NotifyCondVar(tm_vars
.new_timer
);
122 PR_Unlock(tm_vars
.ml
);
126 static PRBool
CancelTimer(TimerEvent
*timer
)
128 PRBool canceled
= PR_FALSE
;
131 timer
->ref_count
-= 1;
132 if (timer
->links
.prev
== &timer
->links
)
134 while (timer
->ref_count
== 1)
136 PR_WaitCondVar(tm_vars
.cancel_timer
, PR_INTERVAL_NO_TIMEOUT
);
141 PR_REMOVE_LINK(&timer
->links
);
144 PR_Unlock(tm_vars
.ml
);
149 static PRStatus
TimerInit(void)
151 tm_vars
.ml
= PR_NewLock();
152 if (NULL
== tm_vars
.ml
)
156 tm_vars
.new_timer
= PR_NewCondVar(tm_vars
.ml
);
157 if (NULL
== tm_vars
.new_timer
)
161 tm_vars
.cancel_timer
= PR_NewCondVar(tm_vars
.ml
);
162 if (NULL
== tm_vars
.cancel_timer
)
166 PR_INIT_CLIST(&tm_vars
.timer_queue
);
167 tm_vars
.manager_thread
= PR_CreateThread(
168 PR_SYSTEM_THREAD
, TimerManager
, NULL
, PR_PRIORITY_NORMAL
,
169 PR_LOCAL_THREAD
, PR_UNJOINABLE_THREAD
, 0);
170 if (NULL
== tm_vars
.manager_thread
)
177 if (NULL
!= tm_vars
.cancel_timer
)
179 PR_DestroyCondVar(tm_vars
.cancel_timer
);
181 if (NULL
!= tm_vars
.new_timer
)
183 PR_DestroyCondVar(tm_vars
.new_timer
);
185 if (NULL
!= tm_vars
.ml
)
187 PR_DestroyLock(tm_vars
.ml
);
194 /******************************************************************/
195 /******************************************************************/
196 /************************ The private portion *********************/
197 /******************************************************************/
198 /******************************************************************/
199 void _PR_InitMW(void)
203 * We use NT 4's InterlockedCompareExchange() to operate
204 * on PRMWStatus variables.
206 PR_ASSERT(sizeof(LONG
) == sizeof(PRMWStatus
));
209 mw_lock
= PR_NewLock();
210 PR_ASSERT(NULL
!= mw_lock
);
211 mw_state
= PR_NEWZAP(_PRGlobalState
);
212 PR_ASSERT(NULL
!= mw_state
);
213 PR_INIT_CLIST(&mw_state
->group_list
);
214 max_polling_interval
= PR_MillisecondsToInterval(MAX_POLLING_INTERVAL
);
217 void _PR_CleanupMW(void)
219 PR_DestroyLock(mw_lock
);
221 if (mw_state
->group
) {
222 PR_DestroyWaitGroup(mw_state
->group
);
223 /* mw_state->group is set to NULL as a side effect. */
226 } /* _PR_CleanupMW */
228 static PRWaitGroup
*MW_Init2(void)
230 PRWaitGroup
*group
= mw_state
->group
; /* it's the null group */
231 if (NULL
== group
) /* there is this special case */
233 group
= PR_CreateWaitGroup(_PR_DEFAULT_HASH_LENGTH
);
234 if (NULL
== group
) goto failed_alloc
;
236 if (NULL
== mw_state
->group
)
238 mw_state
->group
= group
;
242 if (group
!= NULL
) (void)PR_DestroyWaitGroup(group
);
243 group
= mw_state
->group
; /* somebody beat us to it */
246 return group
; /* whatever */
249 static _PR_HashStory
MW_AddHashInternal(PRRecvWait
*desc
, _PRWaiterHash
*hash
)
252 ** The entries are put in the table using the fd (PRFileDesc*) of
253 ** the receive descriptor as the key. This allows us to locate
254 ** the appropriate entry aqain when the poll operation finishes.
256 ** The pointer to the file descriptor object is first divided by
257 ** the natural alignment of a pointer in the belief that object
258 ** will have at least that many zeros in the low order bits.
259 ** This may not be a good assuption.
261 ** We try to put the entry in by rehashing _MW_REHASH_MAX times. After
262 ** that we declare defeat and force the table to be reconstructed.
263 ** Since some fds might be added more than once, won't that cause
264 ** collisions even in an empty table?
266 PRIntn rehash
= _MW_REHASH_MAX
;
268 PRUintn hidx
= _MW_HASH(desc
->fd
, hash
->length
);
273 waiter
= &hash
->recv_wait
;
274 if (NULL
== waiter
[hidx
])
279 printf("Adding 0x%x->0x%x ", desc
, desc
->fd
);
281 "table[%u:%u:*%u]: 0x%x->0x%x\n",
282 hidx
, hash
->count
, hash
->length
, waiter
[hidx
], waiter
[hidx
]->fd
);
284 return _prmw_success
;
286 if (desc
== waiter
[hidx
])
288 PR_SetError(PR_INVALID_ARGUMENT_ERROR
, 0); /* desc already in table */
292 printf("Failing 0x%x->0x%x ", desc
, desc
->fd
);
294 "table[*%u:%u:%u]: 0x%x->0x%x\n",
295 hidx
, hash
->count
, hash
->length
, waiter
[hidx
], waiter
[hidx
]->fd
);
299 hoffset
= _MW_HASH2(desc
->fd
, hash
->length
);
300 PR_ASSERT(0 != hoffset
);
302 hidx
= (hidx
+ hoffset
) % (hash
->length
);
305 } /* MW_AddHashInternal */
307 static _PR_HashStory
MW_ExpandHashInternal(PRWaitGroup
*group
)
310 PRUint32 pidx
, length
;
311 _PRWaiterHash
*newHash
, *oldHash
= group
->waiter
;
315 static const PRInt32 prime_number
[] = {
316 _PR_DEFAULT_HASH_LENGTH
, 179, 521, 907, 1427,
317 2711, 3917, 5021, 8219, 11549, 18911, 26711, 33749, 44771};
318 PRUintn primes
= (sizeof(prime_number
) / sizeof(PRInt32
));
320 /* look up the next size we'd like to use for the hash table */
321 for (pidx
= 0; pidx
< primes
; ++pidx
)
323 if (prime_number
[pidx
] == oldHash
->length
)
328 /* table size must be one of the prime numbers */
329 PR_ASSERT(pidx
< primes
);
331 /* if pidx == primes - 1, we can't expand the table any more */
332 while (pidx
< primes
- 1)
336 length
= prime_number
[pidx
];
338 /* allocate the new hash table and fill it in with the old */
339 newHash
= (_PRWaiterHash
*)PR_CALLOC(
340 sizeof(_PRWaiterHash
) + (length
* sizeof(PRRecvWait
*)));
343 PR_SetError(PR_OUT_OF_MEMORY_ERROR
, 0);
347 newHash
->length
= length
;
349 for (desc
= &oldHash
->recv_wait
;
350 newHash
->count
< oldHash
->count
; ++desc
)
352 PR_ASSERT(desc
< &oldHash
->recv_wait
+ oldHash
->length
);
355 hrv
= MW_AddHashInternal(*desc
, newHash
);
356 PR_ASSERT(_prmw_error
!= hrv
);
357 if (_prmw_success
!= hrv
)
367 PR_DELETE(group
->waiter
);
368 group
->waiter
= newHash
;
369 group
->p_timestamp
+= 1;
370 return _prmw_success
;
373 PR_SetError(PR_OUT_OF_MEMORY_ERROR
, 0);
374 return _prmw_error
; /* we're hosed */
375 } /* MW_ExpandHashInternal */
378 static void _MW_DoneInternal(
379 PRWaitGroup
*group
, PRRecvWait
**waiter
, PRMWStatus outcome
)
382 ** Add this receive wait object to the list of finished I/O
383 ** operations for this particular group. If there are other
384 ** threads waiting on the group, notify one. If not, arrange
385 ** for this thread to return.
389 printf("Removing 0x%x->0x%x\n", *waiter
, (*waiter
)->fd
);
391 (*waiter
)->outcome
= outcome
;
392 PR_APPEND_LINK(&((*waiter
)->internal
), &group
->io_ready
);
393 PR_NotifyCondVar(group
->io_complete
);
394 PR_ASSERT(0 != group
->waiter
->count
);
395 group
->waiter
->count
-= 1;
397 } /* _MW_DoneInternal */
400 static PRRecvWait
**_MW_LookupInternal(PRWaitGroup
*group
, PRFileDesc
*fd
)
403 ** Find the receive wait object corresponding to the file descriptor.
404 ** Only search the wait group specified.
407 PRIntn rehash
= _MW_REHASH_MAX
;
408 _PRWaiterHash
*hash
= group
->waiter
;
409 PRUintn hidx
= _MW_HASH(fd
, hash
->length
);
414 desc
= (&hash
->recv_wait
) + hidx
;
415 if ((*desc
!= NULL
) && ((*desc
)->fd
== fd
)) return desc
;
418 hoffset
= _MW_HASH2(fd
, hash
->length
);
419 PR_ASSERT(0 != hoffset
);
421 hidx
= (hidx
+ hoffset
) % (hash
->length
);
424 } /* _MW_LookupInternal */
427 static PRStatus
_MW_PollInternal(PRWaitGroup
*group
)
430 PRStatus rv
= PR_FAILURE
;
431 PRInt32 count
, count_ready
;
432 PRIntervalTime polling_interval
;
434 group
->poller
= PR_GetCurrentThread();
438 PRIntervalTime now
, since_last_poll
;
439 PRPollDesc
*poll_list
;
441 while (0 == group
->waiter
->count
)
444 st
= PR_WaitCondVar(group
->new_business
, PR_INTERVAL_NO_TIMEOUT
);
445 if (_prmw_running
!= group
->state
)
447 PR_SetError(PR_INVALID_STATE_ERROR
, 0);
450 if (_MW_ABORTED(st
)) goto aborted
;
454 ** There's something to do. See if our existing polling list
455 ** is large enough for what we have to do?
458 while (group
->polling_count
< group
->waiter
->count
)
460 PRUint32 old_count
= group
->waiter
->count
;
461 PRUint32 new_count
= PR_ROUNDUP(old_count
, _PR_POLL_COUNT_FUDGE
);
462 PRSize new_size
= sizeof(PRPollDesc
) * new_count
;
463 PRPollDesc
*old_polling_list
= group
->polling_list
;
465 PR_Unlock(group
->ml
);
466 poll_list
= (PRPollDesc
*)PR_CALLOC(new_size
);
467 if (NULL
== poll_list
)
469 PR_SetError(PR_OUT_OF_MEMORY_ERROR
, 0);
473 if (NULL
!= old_polling_list
)
474 PR_DELETE(old_polling_list
);
476 if (_prmw_running
!= group
->state
)
478 PR_SetError(PR_INVALID_STATE_ERROR
, 0);
481 group
->polling_list
= poll_list
;
482 group
->polling_count
= new_count
;
485 now
= PR_IntervalNow();
486 polling_interval
= max_polling_interval
;
487 since_last_poll
= now
- group
->last_poll
;
489 waiter
= &group
->waiter
->recv_wait
;
490 poll_list
= group
->polling_list
;
491 for (count
= 0; count
< group
->waiter
->count
; ++waiter
)
493 PR_ASSERT(waiter
< &group
->waiter
->recv_wait
494 + group
->waiter
->length
);
495 if (NULL
!= *waiter
) /* a live one! */
497 if ((PR_INTERVAL_NO_TIMEOUT
!= (*waiter
)->timeout
)
498 && (since_last_poll
>= (*waiter
)->timeout
))
499 _MW_DoneInternal(group
, waiter
, PR_MW_TIMEOUT
);
502 if (PR_INTERVAL_NO_TIMEOUT
!= (*waiter
)->timeout
)
504 (*waiter
)->timeout
-= since_last_poll
;
505 if ((*waiter
)->timeout
< polling_interval
)
506 polling_interval
= (*waiter
)->timeout
;
508 PR_ASSERT(poll_list
< group
->polling_list
509 + group
->polling_count
);
510 poll_list
->fd
= (*waiter
)->fd
;
511 poll_list
->in_flags
= PR_POLL_READ
;
512 poll_list
->out_flags
= 0;
515 "Polling 0x%x[%d]: [fd: 0x%x, tmo: %u]\n",
516 poll_list
, count
, poll_list
->fd
, (*waiter
)->timeout
);
524 PR_ASSERT(count
== group
->waiter
->count
);
527 ** If there are no more threads waiting for completion,
528 ** we need to return.
530 if ((!PR_CLIST_IS_EMPTY(&group
->io_ready
))
531 && (1 == group
->waiting_threads
)) break;
533 if (0 == count
) continue; /* wait for new business */
535 group
->last_poll
= now
;
537 PR_Unlock(group
->ml
);
539 count_ready
= PR_Poll(group
->polling_list
, count
, polling_interval
);
543 if (_prmw_running
!= group
->state
)
545 PR_SetError(PR_INVALID_STATE_ERROR
, 0);
548 if (-1 == count_ready
)
550 goto failed_poll
; /* that's a shame */
552 else if (0 < count_ready
)
554 for (poll_list
= group
->polling_list
; count
> 0;
555 poll_list
++, count
--)
558 poll_list
< group
->polling_list
+ group
->polling_count
);
559 if (poll_list
->out_flags
!= 0)
561 waiter
= _MW_LookupInternal(group
, poll_list
->fd
);
563 ** If 'waiter' is NULL, that means the wait receive
564 ** descriptor has been canceled.
567 _MW_DoneInternal(group
, waiter
, PR_MW_SUCCESS
);
572 ** If there are no more threads waiting for completion,
573 ** we need to return.
574 ** This thread was "borrowed" to do the polling, but it really
575 ** belongs to the client.
577 if ((!PR_CLIST_IS_EMPTY(&group
->io_ready
))
578 && (1 == group
->waiting_threads
)) break;
586 group
->poller
= NULL
; /* we were that, not we ain't */
587 if ((_prmw_running
== group
->state
) && (group
->waiting_threads
> 1))
589 /* Wake up one thread to become the new poller. */
590 PR_NotifyCondVar(group
->io_complete
);
592 return rv
; /* we return with the lock held */
593 } /* _MW_PollInternal */
596 static PRMWGroupState
MW_TestForShutdownInternal(PRWaitGroup
*group
)
598 PRMWGroupState rv
= group
->state
;
600 ** Looking at the group's fields is safe because
601 ** once the group's state is no longer running, it
602 ** cannot revert and there is a safe check on entry
603 ** to make sure no more threads are made to wait.
605 if ((_prmw_stopping
== rv
)
606 && (0 == group
->waiting_threads
))
608 rv
= group
->state
= _prmw_stopped
;
609 PR_NotifyCondVar(group
->mw_manage
);
612 } /* MW_TestForShutdownInternal */
615 static void _MW_InitialRecv(PRCList
*io_ready
)
617 PRRecvWait
*desc
= (PRRecvWait
*)io_ready
;
618 if ((NULL
== desc
->buffer
.start
)
619 || (0 == desc
->buffer
.length
))
623 desc
->bytesRecv
= (desc
->fd
->methods
->recv
)(
624 desc
->fd
, desc
->buffer
.start
,
625 desc
->buffer
.length
, 0, desc
->timeout
);
626 if (desc
->bytesRecv
< 0) /* SetError should already be there */
627 desc
->outcome
= PR_MW_FAILURE
;
629 } /* _MW_InitialRecv */
633 static void NT_TimeProc(void *arg
)
635 _MDOverlapped
*overlapped
= (_MDOverlapped
*)arg
;
636 PRRecvWait
*desc
= overlapped
->data
.mw
.desc
;
639 if (InterlockedCompareExchange((LONG
*)&desc
->outcome
,
640 (LONG
)PR_MW_TIMEOUT
, (LONG
)PR_MW_PENDING
) != (LONG
)PR_MW_PENDING
)
642 /* This wait recv descriptor has already completed. */
646 /* close the osfd to abort the outstanding async io request */
648 ** Little late to be checking if NSPR's on the bottom of stack,
649 ** but if we don't check, we can't assert that the private data
650 ** is what we think it is.
653 bottom
= PR_GetIdentitiesLayer(desc
->fd
, PR_NSPR_IO_LAYER
);
654 PR_ASSERT(NULL
!= bottom
);
655 if (NULL
!= bottom
) /* now what!?!?! */
657 bottom
->secret
->state
= _PR_FILEDESC_CLOSED
;
658 if (closesocket(bottom
->secret
->md
.osfd
) == SOCKET_ERROR
)
660 fprintf(stderr
, "closesocket failed: %d\n", WSAGetLastError());
661 PR_ASSERT(!"What shall I do?");
667 static PRStatus
NT_HashRemove(PRWaitGroup
*group
, PRFileDesc
*fd
)
671 _PR_MD_LOCK(&group
->mdlock
);
672 waiter
= _MW_LookupInternal(group
, fd
);
675 group
->waiter
->count
-= 1;
678 _PR_MD_UNLOCK(&group
->mdlock
);
679 return (NULL
!= waiter
) ? PR_SUCCESS
: PR_FAILURE
;
682 PRStatus
NT_HashRemoveInternal(PRWaitGroup
*group
, PRFileDesc
*fd
)
686 waiter
= _MW_LookupInternal(group
, fd
);
689 group
->waiter
->count
-= 1;
692 return (NULL
!= waiter
) ? PR_SUCCESS
: PR_FAILURE
;
696 /******************************************************************/
697 /******************************************************************/
698 /********************** The public API portion ********************/
699 /******************************************************************/
700 /******************************************************************/
701 PR_IMPLEMENT(PRStatus
) PR_AddWaitFileDesc(
702 PRWaitGroup
*group
, PRRecvWait
*desc
)
705 PRStatus rv
= PR_FAILURE
;
707 _MDOverlapped
*overlapped
;
714 if (!_pr_initialized
) _PR_ImplicitInitialization();
715 if ((NULL
== group
) && (NULL
== (group
= MW_Init2())))
720 PR_ASSERT(NULL
!= desc
->fd
);
722 desc
->outcome
= PR_MW_PENDING
; /* nice, well known value */
723 desc
->bytesRecv
= 0; /* likewise, though this value is ambiguious */
727 if (_prmw_running
!= group
->state
)
729 /* Not allowed to add after cancelling the group */
730 desc
->outcome
= PR_MW_INTERRUPT
;
731 PR_SetError(PR_INVALID_STATE_ERROR
, 0);
732 PR_Unlock(group
->ml
);
737 _PR_MD_LOCK(&group
->mdlock
);
741 ** If the waiter count is zero at this point, there's no telling
742 ** how long we've been idle. Therefore, initialize the beginning
743 ** of the timing interval. As long as the list doesn't go empty,
744 ** it will maintain itself.
746 if (0 == group
->waiter
->count
)
747 group
->last_poll
= PR_IntervalNow();
751 hrv
= MW_AddHashInternal(desc
, group
->waiter
);
752 if (_prmw_rehash
!= hrv
) break;
753 hrv
= MW_ExpandHashInternal(group
); /* gruesome */
754 if (_prmw_success
!= hrv
) break;
758 _PR_MD_UNLOCK(&group
->mdlock
);
761 PR_NotifyCondVar(group
->new_business
); /* tell the world */
762 rv
= (_prmw_success
== hrv
) ? PR_SUCCESS
: PR_FAILURE
;
763 PR_Unlock(group
->ml
);
766 overlapped
= PR_NEWZAP(_MDOverlapped
);
767 if (NULL
== overlapped
)
769 PR_SetError(PR_OUT_OF_MEMORY_ERROR
, 0);
770 NT_HashRemove(group
, desc
->fd
);
773 overlapped
->ioModel
= _MD_MultiWaitIO
;
774 overlapped
->data
.mw
.desc
= desc
;
775 overlapped
->data
.mw
.group
= group
;
776 if (desc
->timeout
!= PR_INTERVAL_NO_TIMEOUT
)
778 overlapped
->data
.mw
.timer
= CreateTimer(
782 if (0 == overlapped
->data
.mw
.timer
)
784 NT_HashRemove(group
, desc
->fd
);
785 PR_DELETE(overlapped
);
787 * XXX It appears that a maximum of 16 timer events can
788 * be outstanding. GetLastError() returns 0 when I try it.
790 PR_SetError(PR_INSUFFICIENT_RESOURCES_ERROR
, GetLastError());
795 /* Reach to the bottom layer to get the OS fd */
796 bottom
= PR_GetIdentitiesLayer(desc
->fd
, PR_NSPR_IO_LAYER
);
797 PR_ASSERT(NULL
!= bottom
);
800 PR_SetError(PR_INVALID_ARGUMENT_ERROR
, 0);
803 hFile
= (HANDLE
)bottom
->secret
->md
.osfd
;
804 if (!bottom
->secret
->md
.io_model_committed
)
807 st
= _md_Associate(hFile
);
809 bottom
->secret
->md
.io_model_committed
= PR_TRUE
;
811 bResult
= ReadFile(hFile
,
813 (DWORD
)desc
->buffer
.length
,
815 &overlapped
->overlapped
);
816 if (FALSE
== bResult
&& (dwError
= GetLastError()) != ERROR_IO_PENDING
)
818 if (desc
->timeout
!= PR_INTERVAL_NO_TIMEOUT
)
820 if (InterlockedCompareExchange((LONG
*)&desc
->outcome
,
821 (LONG
)PR_MW_FAILURE
, (LONG
)PR_MW_PENDING
)
822 == (LONG
)PR_MW_PENDING
)
824 CancelTimer(overlapped
->data
.mw
.timer
);
826 NT_HashRemove(group
, desc
->fd
);
827 PR_DELETE(overlapped
);
829 _PR_MD_MAP_READ_ERROR(dwError
);
835 } /* PR_AddWaitFileDesc */
837 PR_IMPLEMENT(PRRecvWait
*) PR_WaitRecvReady(PRWaitGroup
*group
)
839 PRCList
*io_ready
= NULL
;
841 PRThread
*me
= _PR_MD_CURRENT_THREAD();
842 _MDOverlapped
*overlapped
;
845 if (!_pr_initialized
) _PR_ImplicitInitialization();
846 if ((NULL
== group
) && (NULL
== (group
= MW_Init2()))) goto failed_init
;
850 if (_prmw_running
!= group
->state
)
852 PR_SetError(PR_INVALID_STATE_ERROR
, 0);
856 group
->waiting_threads
+= 1; /* the polling thread is counted */
859 _PR_MD_LOCK(&group
->mdlock
);
860 while (PR_CLIST_IS_EMPTY(&group
->io_ready
))
863 me
->state
= _PR_IO_WAIT
;
864 PR_APPEND_LINK(&me
->waitQLinks
, &group
->wait_list
);
865 if (!_PR_IS_NATIVE_THREAD(me
))
867 _PR_SLEEPQ_LOCK(me
->cpu
);
868 _PR_ADD_SLEEPQ(me
, PR_INTERVAL_NO_TIMEOUT
);
869 _PR_SLEEPQ_UNLOCK(me
->cpu
);
871 _PR_THREAD_UNLOCK(me
);
872 _PR_MD_UNLOCK(&group
->mdlock
);
873 PR_Unlock(group
->ml
);
874 _PR_MD_WAIT(me
, PR_INTERVAL_NO_TIMEOUT
);
875 me
->state
= _PR_RUNNING
;
877 _PR_MD_LOCK(&group
->mdlock
);
878 if (_PR_PENDING_INTERRUPT(me
)) {
879 PR_REMOVE_LINK(&me
->waitQLinks
);
880 _PR_MD_UNLOCK(&group
->mdlock
);
881 me
->flags
&= ~_PR_INTERRUPT
;
882 me
->io_suspended
= PR_FALSE
;
883 PR_SetError(PR_PENDING_INTERRUPT_ERROR
, 0);
887 io_ready
= PR_LIST_HEAD(&group
->io_ready
);
888 PR_ASSERT(io_ready
!= NULL
);
889 PR_REMOVE_LINK(io_ready
);
890 _PR_MD_UNLOCK(&group
->mdlock
);
891 overlapped
= (_MDOverlapped
*)
892 ((char *)io_ready
- offsetof(_MDOverlapped
, data
));
893 io_ready
= &overlapped
->data
.mw
.desc
->internal
;
898 ** If the I/O ready list isn't empty, have this thread
899 ** return with the first receive wait object that's available.
901 if (PR_CLIST_IS_EMPTY(&group
->io_ready
))
904 ** Is there a polling thread yet? If not, grab this thread
907 if (NULL
== group
->poller
)
910 ** This thread will stay do polling until it becomes the only one
911 ** left to service a completion. Then it will return and there will
912 ** be none left to actually poll or to run completions.
914 ** The polling function should only return w/ failure or
915 ** with some I/O ready.
917 if (PR_FAILURE
== _MW_PollInternal(group
)) goto failed_poll
;
922 ** There are four reasons a thread can be awakened from
923 ** a wait on the io_complete condition variable.
924 ** 1. Some I/O has completed, i.e., the io_ready list
926 ** 2. The wait group is canceled.
927 ** 3. The thread is interrupted.
928 ** 4. The current polling thread has to leave and needs
930 ** The logic to find a new polling thread is made more
931 ** complicated by all the other possible events.
932 ** I tried my best to write the logic clearly, but
933 ** it is still full of if's with continue and goto.
938 st
= PR_WaitCondVar(group
->io_complete
, PR_INTERVAL_NO_TIMEOUT
);
939 if (_prmw_running
!= group
->state
)
941 PR_SetError(PR_INVALID_STATE_ERROR
, 0);
944 if (_MW_ABORTED(st
) || (NULL
== group
->poller
)) break;
945 } while (PR_CLIST_IS_EMPTY(&group
->io_ready
));
948 ** The thread is interrupted and has to leave. It might
949 ** have also been awakened to process ready i/o or be the
950 ** new poller. To be safe, if either condition is true,
951 ** we awaken another thread to take its place.
955 if ((NULL
== group
->poller
956 || !PR_CLIST_IS_EMPTY(&group
->io_ready
))
957 && group
->waiting_threads
> 1)
958 PR_NotifyCondVar(group
->io_complete
);
963 ** A new poller is needed, but can I be the new poller?
964 ** If there is no i/o ready, sure. But if there is any
965 ** i/o ready, it has a higher priority. I want to
966 ** process the ready i/o first and wake up another
967 ** thread to be the new poller.
969 if (NULL
== group
->poller
)
971 if (PR_CLIST_IS_EMPTY(&group
->io_ready
))
973 if (group
->waiting_threads
> 1)
974 PR_NotifyCondVar(group
->io_complete
);
977 PR_ASSERT(!PR_CLIST_IS_EMPTY(&group
->io_ready
));
979 io_ready
= PR_LIST_HEAD(&group
->io_ready
);
980 PR_NotifyCondVar(group
->io_taken
);
981 PR_ASSERT(io_ready
!= NULL
);
982 PR_REMOVE_LINK(io_ready
);
983 } while (NULL
== io_ready
);
991 group
->waiting_threads
-= 1;
993 (void)MW_TestForShutdownInternal(group
);
994 PR_Unlock(group
->ml
);
997 if (NULL
!= io_ready
)
999 /* If the operation failed, record the reason why */
1000 switch (((PRRecvWait
*)io_ready
)->outcome
)
1007 _MW_InitialRecv(io_ready
);
1012 _PR_MD_MAP_READ_ERROR(overlapped
->data
.mw
.error
);
1016 PR_SetError(PR_IO_TIMEOUT_ERROR
, 0);
1018 case PR_MW_INTERRUPT
:
1019 PR_SetError(PR_PENDING_INTERRUPT_ERROR
, 0);
1024 if (NULL
!= overlapped
->data
.mw
.timer
)
1026 PR_ASSERT(PR_INTERVAL_NO_TIMEOUT
1027 != overlapped
->data
.mw
.desc
->timeout
);
1028 CancelTimer(overlapped
->data
.mw
.timer
);
1032 PR_ASSERT(PR_INTERVAL_NO_TIMEOUT
1033 == overlapped
->data
.mw
.desc
->timeout
);
1035 PR_DELETE(overlapped
);
1038 return (PRRecvWait
*)io_ready
;
1039 } /* PR_WaitRecvReady */
1041 PR_IMPLEMENT(PRStatus
) PR_CancelWaitFileDesc(PRWaitGroup
*group
, PRRecvWait
*desc
)
1044 PRRecvWait
**recv_wait
;
1046 PRStatus rv
= PR_SUCCESS
;
1047 if (NULL
== group
) group
= mw_state
->group
;
1048 PR_ASSERT(NULL
!= group
);
1051 PR_SetError(PR_INVALID_ARGUMENT_ERROR
, 0);
1057 if (_prmw_running
!= group
->state
)
1059 PR_SetError(PR_INVALID_STATE_ERROR
, 0);
1065 if (InterlockedCompareExchange((LONG
*)&desc
->outcome
,
1066 (LONG
)PR_MW_INTERRUPT
, (LONG
)PR_MW_PENDING
) == (LONG
)PR_MW_PENDING
)
1068 PRFileDesc
*bottom
= PR_GetIdentitiesLayer(desc
->fd
, PR_NSPR_IO_LAYER
);
1069 PR_ASSERT(NULL
!= bottom
);
1072 PR_SetError(PR_INVALID_ARGUMENT_ERROR
, 0);
1075 bottom
->secret
->state
= _PR_FILEDESC_CLOSED
;
1077 fprintf(stderr
, "cancel wait recv: closing socket\n");
1079 if (closesocket(bottom
->secret
->md
.osfd
) == SOCKET_ERROR
)
1081 fprintf(stderr
, "closesocket failed: %d\n", WSAGetLastError());
1086 if (NULL
!= (recv_wait
= _MW_LookupInternal(group
, desc
->fd
)))
1088 /* it was in the wait table */
1089 _MW_DoneInternal(group
, recv_wait
, PR_MW_INTERRUPT
);
1092 if (!PR_CLIST_IS_EMPTY(&group
->io_ready
))
1094 /* is it already complete? */
1095 PRCList
*head
= PR_LIST_HEAD(&group
->io_ready
);
1098 PRRecvWait
*done
= (PRRecvWait
*)head
;
1099 if (done
== desc
) goto unlock
;
1100 head
= PR_NEXT_LINK(head
);
1101 } while (head
!= &group
->io_ready
);
1103 PR_SetError(PR_INVALID_ARGUMENT_ERROR
, 0);
1108 PR_Unlock(group
->ml
);
1110 } /* PR_CancelWaitFileDesc */
1112 PR_IMPLEMENT(PRRecvWait
*) PR_CancelWaitGroup(PRWaitGroup
*group
)
1115 PRRecvWait
*recv_wait
= NULL
;
1117 _MDOverlapped
*overlapped
;
1119 PRThread
*me
= _PR_MD_CURRENT_THREAD();
1122 if (NULL
== group
) group
= mw_state
->group
;
1123 PR_ASSERT(NULL
!= group
);
1126 PR_SetError(PR_INVALID_ARGUMENT_ERROR
, 0);
1131 if (_prmw_stopped
!= group
->state
)
1133 if (_prmw_running
== group
->state
)
1134 group
->state
= _prmw_stopping
; /* so nothing new comes in */
1135 if (0 == group
->waiting_threads
) /* is there anybody else? */
1136 group
->state
= _prmw_stopped
; /* we can stop right now */
1139 PR_NotifyAllCondVar(group
->new_business
);
1140 PR_NotifyAllCondVar(group
->io_complete
);
1142 while (_prmw_stopped
!= group
->state
)
1143 (void)PR_WaitCondVar(group
->mw_manage
, PR_INTERVAL_NO_TIMEOUT
);
1147 _PR_MD_LOCK(&group
->mdlock
);
1149 /* make all the existing descriptors look done/interrupted */
1151 end
= &group
->waiter
->recv_wait
+ group
->waiter
->length
;
1152 for (desc
= &group
->waiter
->recv_wait
; desc
< end
; ++desc
)
1156 if (InterlockedCompareExchange((LONG
*)&(*desc
)->outcome
,
1157 (LONG
)PR_MW_INTERRUPT
, (LONG
)PR_MW_PENDING
)
1158 == (LONG
)PR_MW_PENDING
)
1160 PRFileDesc
*bottom
= PR_GetIdentitiesLayer(
1161 (*desc
)->fd
, PR_NSPR_IO_LAYER
);
1162 PR_ASSERT(NULL
!= bottom
);
1165 PR_SetError(PR_INVALID_ARGUMENT_ERROR
, 0);
1168 bottom
->secret
->state
= _PR_FILEDESC_CLOSED
;
1170 fprintf(stderr
, "cancel wait group: closing socket\n");
1172 if (closesocket(bottom
->secret
->md
.osfd
) == SOCKET_ERROR
)
1174 fprintf(stderr
, "closesocket failed: %d\n",
1181 while (group
->waiter
->count
> 0)
1183 _PR_THREAD_LOCK(me
);
1184 me
->state
= _PR_IO_WAIT
;
1185 PR_APPEND_LINK(&me
->waitQLinks
, &group
->wait_list
);
1186 if (!_PR_IS_NATIVE_THREAD(me
))
1188 _PR_SLEEPQ_LOCK(me
->cpu
);
1189 _PR_ADD_SLEEPQ(me
, PR_INTERVAL_NO_TIMEOUT
);
1190 _PR_SLEEPQ_UNLOCK(me
->cpu
);
1192 _PR_THREAD_UNLOCK(me
);
1193 _PR_MD_UNLOCK(&group
->mdlock
);
1194 PR_Unlock(group
->ml
);
1195 _PR_MD_WAIT(me
, PR_INTERVAL_NO_TIMEOUT
);
1196 me
->state
= _PR_RUNNING
;
1198 _PR_MD_LOCK(&group
->mdlock
);
1201 for (desc
= &group
->waiter
->recv_wait
; group
->waiter
->count
> 0; ++desc
)
1203 PR_ASSERT(desc
< &group
->waiter
->recv_wait
+ group
->waiter
->length
);
1205 _MW_DoneInternal(group
, desc
, PR_MW_INTERRUPT
);
1209 /* take first element of finished list and return it or NULL */
1210 if (PR_CLIST_IS_EMPTY(&group
->io_ready
))
1211 PR_SetError(PR_GROUP_EMPTY_ERROR
, 0);
1214 PRCList
*head
= PR_LIST_HEAD(&group
->io_ready
);
1215 PR_REMOVE_AND_INIT_LINK(head
);
1217 overlapped
= (_MDOverlapped
*)
1218 ((char *)head
- offsetof(_MDOverlapped
, data
));
1219 head
= &overlapped
->data
.mw
.desc
->internal
;
1220 if (NULL
!= overlapped
->data
.mw
.timer
)
1222 PR_ASSERT(PR_INTERVAL_NO_TIMEOUT
1223 != overlapped
->data
.mw
.desc
->timeout
);
1224 CancelTimer(overlapped
->data
.mw
.timer
);
1228 PR_ASSERT(PR_INTERVAL_NO_TIMEOUT
1229 == overlapped
->data
.mw
.desc
->timeout
);
1231 PR_DELETE(overlapped
);
1233 recv_wait
= (PRRecvWait
*)head
;
1237 _PR_MD_UNLOCK(&group
->mdlock
);
1239 PR_Unlock(group
->ml
);
1242 } /* PR_CancelWaitGroup */
1244 PR_IMPLEMENT(PRWaitGroup
*) PR_CreateWaitGroup(PRInt32 size
/* ignored */)
1248 if (NULL
== (wg
= PR_NEWZAP(PRWaitGroup
)))
1250 PR_SetError(PR_OUT_OF_MEMORY_ERROR
, 0);
1253 /* the wait group itself */
1254 wg
->ml
= PR_NewLock();
1255 if (NULL
== wg
->ml
) goto failed_lock
;
1256 wg
->io_taken
= PR_NewCondVar(wg
->ml
);
1257 if (NULL
== wg
->io_taken
) goto failed_cvar0
;
1258 wg
->io_complete
= PR_NewCondVar(wg
->ml
);
1259 if (NULL
== wg
->io_complete
) goto failed_cvar1
;
1260 wg
->new_business
= PR_NewCondVar(wg
->ml
);
1261 if (NULL
== wg
->new_business
) goto failed_cvar2
;
1262 wg
->mw_manage
= PR_NewCondVar(wg
->ml
);
1263 if (NULL
== wg
->mw_manage
) goto failed_cvar3
;
1265 PR_INIT_CLIST(&wg
->group_link
);
1266 PR_INIT_CLIST(&wg
->io_ready
);
1268 /* the waiters sequence */
1269 wg
->waiter
= (_PRWaiterHash
*)PR_CALLOC(
1270 sizeof(_PRWaiterHash
) +
1271 (_PR_DEFAULT_HASH_LENGTH
* sizeof(PRRecvWait
*)));
1272 if (NULL
== wg
->waiter
)
1274 PR_SetError(PR_OUT_OF_MEMORY_ERROR
, 0);
1277 wg
->waiter
->count
= 0;
1278 wg
->waiter
->length
= _PR_DEFAULT_HASH_LENGTH
;
1281 _PR_MD_NEW_LOCK(&wg
->mdlock
);
1282 PR_INIT_CLIST(&wg
->wait_list
);
1286 PR_APPEND_LINK(&wg
->group_link
, &mw_state
->group_list
);
1291 PR_DestroyCondVar(wg
->mw_manage
);
1293 PR_DestroyCondVar(wg
->new_business
);
1295 PR_DestroyCondVar(wg
->io_complete
);
1297 PR_DestroyCondVar(wg
->io_taken
);
1299 PR_DestroyLock(wg
->ml
);
1306 } /* MW_CreateWaitGroup */
1308 PR_IMPLEMENT(PRStatus
) PR_DestroyWaitGroup(PRWaitGroup
*group
)
1310 PRStatus rv
= PR_SUCCESS
;
1311 if (NULL
== group
) group
= mw_state
->group
;
1312 PR_ASSERT(NULL
!= group
);
1316 if ((group
->waiting_threads
== 0)
1317 && (group
->waiter
->count
== 0)
1318 && PR_CLIST_IS_EMPTY(&group
->io_ready
))
1320 group
->state
= _prmw_stopped
;
1324 PR_SetError(PR_INVALID_STATE_ERROR
, 0);
1327 PR_Unlock(group
->ml
);
1328 if (PR_FAILURE
== rv
) return rv
;
1331 PR_REMOVE_LINK(&group
->group_link
);
1336 * XXX make sure wait_list is empty and waiter is empty.
1337 * These must be checked while holding mdlock.
1339 _PR_MD_FREE_LOCK(&group
->mdlock
);
1342 PR_DELETE(group
->waiter
);
1343 PR_DELETE(group
->polling_list
);
1344 PR_DestroyCondVar(group
->mw_manage
);
1345 PR_DestroyCondVar(group
->new_business
);
1346 PR_DestroyCondVar(group
->io_complete
);
1347 PR_DestroyCondVar(group
->io_taken
);
1348 PR_DestroyLock(group
->ml
);
1349 if (group
== mw_state
->group
) mw_state
->group
= NULL
;
1354 /* The default wait group is not created yet. */
1355 PR_SetError(PR_INVALID_ARGUMENT_ERROR
, 0);
1359 } /* PR_DestroyWaitGroup */
1361 /**********************************************************************
1362 ***********************************************************************
1363 ******************** Wait group enumerations **************************
1364 ***********************************************************************
1365 **********************************************************************/
1367 PR_IMPLEMENT(PRMWaitEnumerator
*) PR_CreateMWaitEnumerator(PRWaitGroup
*group
)
1369 PRMWaitEnumerator
*enumerator
= PR_NEWZAP(PRMWaitEnumerator
);
1370 if (NULL
== enumerator
) PR_SetError(PR_OUT_OF_MEMORY_ERROR
, 0);
1373 enumerator
->group
= group
;
1374 enumerator
->seal
= _PR_ENUM_SEALED
;
1377 } /* PR_CreateMWaitEnumerator */
1379 PR_IMPLEMENT(PRStatus
) PR_DestroyMWaitEnumerator(PRMWaitEnumerator
* enumerator
)
1381 PR_ASSERT(NULL
!= enumerator
);
1382 PR_ASSERT(_PR_ENUM_SEALED
== enumerator
->seal
);
1383 if ((NULL
== enumerator
) || (_PR_ENUM_SEALED
!= enumerator
->seal
))
1385 PR_SetError(PR_INVALID_ARGUMENT_ERROR
, 0);
1388 enumerator
->seal
= _PR_ENUM_UNSEALED
;
1389 PR_Free(enumerator
);
1391 } /* PR_DestroyMWaitEnumerator */
1393 PR_IMPLEMENT(PRRecvWait
*) PR_EnumerateWaitGroup(
1394 PRMWaitEnumerator
*enumerator
, const PRRecvWait
*previous
)
1396 PRRecvWait
*result
= NULL
;
1398 /* entry point sanity checking */
1399 PR_ASSERT(NULL
!= enumerator
);
1400 PR_ASSERT(_PR_ENUM_SEALED
== enumerator
->seal
);
1401 if ((NULL
== enumerator
)
1402 || (_PR_ENUM_SEALED
!= enumerator
->seal
)) goto bad_argument
;
1404 /* beginning of enumeration */
1405 if (NULL
== previous
)
1407 if (NULL
== enumerator
->group
)
1409 enumerator
->group
= mw_state
->group
;
1410 if (NULL
== enumerator
->group
)
1412 PR_SetError(PR_GROUP_EMPTY_ERROR
, 0);
1416 enumerator
->waiter
= &enumerator
->group
->waiter
->recv_wait
;
1417 enumerator
->p_timestamp
= enumerator
->group
->p_timestamp
;
1418 enumerator
->thread
= PR_GetCurrentThread();
1419 enumerator
->index
= 0;
1421 /* continuing an enumeration */
1424 PRThread
*me
= PR_GetCurrentThread();
1425 PR_ASSERT(me
== enumerator
->thread
);
1426 if (me
!= enumerator
->thread
) goto bad_argument
;
1428 /* need to restart the enumeration */
1429 if (enumerator
->p_timestamp
!= enumerator
->group
->p_timestamp
)
1430 return PR_EnumerateWaitGroup(enumerator
, NULL
);
1433 /* actually progress the enumeration */
1435 _PR_MD_LOCK(&enumerator
->group
->mdlock
);
1437 PR_Lock(enumerator
->group
->ml
);
1439 while (enumerator
->index
++ < enumerator
->group
->waiter
->length
)
1441 if (NULL
!= (result
= *(enumerator
->waiter
)++)) break;
1444 _PR_MD_UNLOCK(&enumerator
->group
->mdlock
);
1446 PR_Unlock(enumerator
->group
->ml
);
1449 return result
; /* what we live for */
1452 PR_SetError(PR_INVALID_ARGUMENT_ERROR
, 0);
1453 return NULL
; /* probably ambiguous */
1454 } /* PR_EnumerateWaitGroup */