2 * A type which wraps a semaphore
6 * Copyright (c) 2006-2008, R Oudkerk --- see COPYING.txt
9 #include "multiprocessing.h"
11 enum { RECURSIVE_MUTEX
, SEMAPHORE
};
22 #define ISMINE(o) (o->count > 0 && PyThread_get_thread_ident() == o->last_tid)
31 #define SEM_FAILED NULL
33 #define SEM_CLEAR_ERROR() SetLastError(0)
34 #define SEM_GET_LAST_ERROR() GetLastError()
35 #define SEM_CREATE(name, val, max) CreateSemaphore(NULL, val, max, NULL)
36 #define SEM_CLOSE(sem) (CloseHandle(sem) ? 0 : -1)
37 #define SEM_GETVALUE(sem, pval) _GetSemaphoreValue(sem, pval)
38 #define SEM_UNLINK(name) 0
41 _GetSemaphoreValue(HANDLE handle
, long *value
)
45 switch (WaitForSingleObject(handle
, 0)) {
47 if (!ReleaseSemaphore(handle
, 1, &previous
))
48 return MP_STANDARD_ERROR
;
49 *value
= previous
+ 1;
55 return MP_STANDARD_ERROR
;
60 semlock_acquire(SemLockObject
*self
, PyObject
*args
, PyObject
*kwds
)
64 PyObject
*timeout_obj
= Py_None
;
65 DWORD res
, full_msecs
, msecs
, start
, ticks
;
67 static char *kwlist
[] = {"block", "timeout", NULL
};
69 if (!PyArg_ParseTupleAndKeywords(args
, kwds
, "|iO", kwlist
,
70 &blocking
, &timeout_obj
))
73 /* calculate timeout */
76 } else if (timeout_obj
== Py_None
) {
77 full_msecs
= INFINITE
;
79 timeout
= PyFloat_AsDouble(timeout_obj
);
82 timeout
*= 1000.0; /* convert to millisecs */
85 } else if (timeout
>= 0.5 * INFINITE
) { /* 25 days */
86 PyErr_SetString(PyExc_OverflowError
,
87 "timeout is too large");
90 full_msecs
= (DWORD
)(timeout
+ 0.5);
93 /* check whether we already own the lock */
94 if (self
->kind
== RECURSIVE_MUTEX
&& ISMINE(self
)) {
99 /* check whether we can acquire without blocking */
100 if (WaitForSingleObject(self
->handle
, 0) == WAIT_OBJECT_0
) {
101 self
->last_tid
= GetCurrentThreadId();
107 start
= GetTickCount();
110 HANDLE handles
[2] = {self
->handle
, sigint_event
};
113 Py_BEGIN_ALLOW_THREADS
114 ResetEvent(sigint_event
);
115 res
= WaitForMultipleObjects(2, handles
, FALSE
, msecs
);
119 if (res
!= WAIT_OBJECT_0
+ 1)
122 /* got SIGINT so give signal handler a chance to run */
125 /* if this is main thread let KeyboardInterrupt be raised */
126 if (PyErr_CheckSignals())
129 /* recalculate timeout */
130 if (msecs
!= INFINITE
) {
131 ticks
= GetTickCount();
132 if ((DWORD
)(ticks
- start
) >= full_msecs
)
134 msecs
= full_msecs
- (ticks
- start
);
143 self
->last_tid
= GetCurrentThreadId();
147 return PyErr_SetFromWindowsErr(0);
149 PyErr_Format(PyExc_RuntimeError
, "WaitForSingleObject() or "
150 "WaitForMultipleObjects() gave unrecognized "
157 semlock_release(SemLockObject
*self
, PyObject
*args
)
159 if (self
->kind
== RECURSIVE_MUTEX
) {
161 PyErr_SetString(PyExc_AssertionError
, "attempt to "
162 "release recursive lock not owned "
166 if (self
->count
> 1) {
170 assert(self
->count
== 1);
173 if (!ReleaseSemaphore(self
->handle
, 1, NULL
)) {
174 if (GetLastError() == ERROR_TOO_MANY_POSTS
) {
175 PyErr_SetString(PyExc_ValueError
, "semaphore or lock "
176 "released too many times");
179 return PyErr_SetFromWindowsErr(0);
187 #else /* !MS_WINDOWS */
193 #define SEM_CLEAR_ERROR()
194 #define SEM_GET_LAST_ERROR() 0
195 #define SEM_CREATE(name, val, max) sem_open(name, O_CREAT | O_EXCL, 0600, val)
196 #define SEM_CLOSE(sem) sem_close(sem)
197 #define SEM_GETVALUE(sem, pval) sem_getvalue(sem, pval)
198 #define SEM_UNLINK(name) sem_unlink(name)
200 #if HAVE_BROKEN_SEM_UNLINK
201 # define sem_unlink(name) 0
204 #if !HAVE_SEM_TIMEDWAIT
205 # define sem_timedwait(sem,deadline) sem_timedwait_save(sem,deadline,_save)
208 sem_timedwait_save(sem_t
*sem
, struct timespec
*deadline
, PyThreadState
*_save
)
211 unsigned long delay
, difference
;
212 struct timeval now
, tvdeadline
, tvdelay
;
215 tvdeadline
.tv_sec
= deadline
->tv_sec
;
216 tvdeadline
.tv_usec
= deadline
->tv_nsec
/ 1000;
218 for (delay
= 0 ; ; delay
+= 1000) {
220 if (sem_trywait(sem
) == 0)
222 else if (errno
!= EAGAIN
)
223 return MP_STANDARD_ERROR
;
225 /* get current time */
226 if (gettimeofday(&now
, NULL
) < 0)
227 return MP_STANDARD_ERROR
;
229 /* check for timeout */
230 if (tvdeadline
.tv_sec
< now
.tv_sec
||
231 (tvdeadline
.tv_sec
== now
.tv_sec
&&
232 tvdeadline
.tv_usec
<= now
.tv_usec
)) {
234 return MP_STANDARD_ERROR
;
237 /* calculate how much time is left */
238 difference
= (tvdeadline
.tv_sec
- now
.tv_sec
) * 1000000 +
239 (tvdeadline
.tv_usec
- now
.tv_usec
);
241 /* check delay not too long -- maximum is 20 msecs */
244 if (delay
> difference
)
248 tvdelay
.tv_sec
= delay
/ 1000000;
249 tvdelay
.tv_usec
= delay
% 1000000;
250 if (select(0, NULL
, NULL
, NULL
, &tvdelay
) < 0)
251 return MP_STANDARD_ERROR
;
253 /* check for signals */
255 res
= PyErr_CheckSignals();
260 return MP_EXCEPTION_HAS_BEEN_SET
;
265 #endif /* !HAVE_SEM_TIMEDWAIT */
268 semlock_acquire(SemLockObject
*self
, PyObject
*args
, PyObject
*kwds
)
270 int blocking
= 1, res
;
272 PyObject
*timeout_obj
= Py_None
;
273 struct timespec deadline
= {0};
277 static char *kwlist
[] = {"block", "timeout", NULL
};
279 if (!PyArg_ParseTupleAndKeywords(args
, kwds
, "|iO", kwlist
,
280 &blocking
, &timeout_obj
))
283 if (self
->kind
== RECURSIVE_MUTEX
&& ISMINE(self
)) {
288 if (timeout_obj
!= Py_None
) {
289 timeout
= PyFloat_AsDouble(timeout_obj
);
290 if (PyErr_Occurred())
295 if (gettimeofday(&now
, NULL
) < 0) {
296 PyErr_SetFromErrno(PyExc_OSError
);
299 sec
= (long) timeout
;
300 nsec
= (long) (1e9
* (timeout
- sec
) + 0.5);
301 deadline
.tv_sec
= now
.tv_sec
+ sec
;
302 deadline
.tv_nsec
= now
.tv_usec
* 1000 + nsec
;
303 deadline
.tv_sec
+= (deadline
.tv_nsec
/ 1000000000);
304 deadline
.tv_nsec
%= 1000000000;
308 Py_BEGIN_ALLOW_THREADS
309 if (blocking
&& timeout_obj
== Py_None
)
310 res
= sem_wait(self
->handle
);
312 res
= sem_trywait(self
->handle
);
314 res
= sem_timedwait(self
->handle
, &deadline
);
316 if (res
== MP_EXCEPTION_HAS_BEEN_SET
)
318 } while (res
< 0 && errno
== EINTR
&& !PyErr_CheckSignals());
321 if (errno
== EAGAIN
|| errno
== ETIMEDOUT
)
323 else if (errno
== EINTR
)
326 return PyErr_SetFromErrno(PyExc_OSError
);
330 self
->last_tid
= PyThread_get_thread_ident();
336 semlock_release(SemLockObject
*self
, PyObject
*args
)
338 if (self
->kind
== RECURSIVE_MUTEX
) {
340 PyErr_SetString(PyExc_AssertionError
, "attempt to "
341 "release recursive lock not owned "
345 if (self
->count
> 1) {
349 assert(self
->count
== 1);
351 #if HAVE_BROKEN_SEM_GETVALUE
352 /* We will only check properly the maxvalue == 1 case */
353 if (self
->maxvalue
== 1) {
354 /* make sure that already locked */
355 if (sem_trywait(self
->handle
) < 0) {
356 if (errno
!= EAGAIN
) {
357 PyErr_SetFromErrno(PyExc_OSError
);
360 /* it is already locked as expected */
362 /* it was not locked so undo wait and raise */
363 if (sem_post(self
->handle
) < 0) {
364 PyErr_SetFromErrno(PyExc_OSError
);
367 PyErr_SetString(PyExc_ValueError
, "semaphore "
368 "or lock released too many "
376 /* This check is not an absolute guarantee that the semaphore
377 does not rise above maxvalue. */
378 if (sem_getvalue(self
->handle
, &sval
) < 0) {
379 return PyErr_SetFromErrno(PyExc_OSError
);
380 } else if (sval
>= self
->maxvalue
) {
381 PyErr_SetString(PyExc_ValueError
, "semaphore or lock "
382 "released too many times");
388 if (sem_post(self
->handle
) < 0)
389 return PyErr_SetFromErrno(PyExc_OSError
);
395 #endif /* !MS_WINDOWS */
402 newsemlockobject(PyTypeObject
*type
, SEM_HANDLE handle
, int kind
, int maxvalue
)
406 self
= PyObject_New(SemLockObject
, type
);
409 self
->handle
= handle
;
413 self
->maxvalue
= maxvalue
;
414 return (PyObject
*)self
;
418 semlock_new(PyTypeObject
*type
, PyObject
*args
, PyObject
*kwds
)
421 SEM_HANDLE handle
= SEM_FAILED
;
422 int kind
, maxvalue
, value
;
424 static char *kwlist
[] = {"kind", "value", "maxvalue", NULL
};
425 static int counter
= 0;
427 if (!PyArg_ParseTupleAndKeywords(args
, kwds
, "iii", kwlist
,
428 &kind
, &value
, &maxvalue
))
431 if (kind
!= RECURSIVE_MUTEX
&& kind
!= SEMAPHORE
) {
432 PyErr_SetString(PyExc_ValueError
, "unrecognized kind");
436 PyOS_snprintf(buffer
, sizeof(buffer
), "/mp%d-%d", getpid(), counter
++);
439 handle
= SEM_CREATE(buffer
, value
, maxvalue
);
440 /* On Windows we should fail if GetLastError()==ERROR_ALREADY_EXISTS */
441 if (handle
== SEM_FAILED
|| SEM_GET_LAST_ERROR() != 0)
444 if (SEM_UNLINK(buffer
) < 0)
447 result
= newsemlockobject(type
, handle
, kind
, maxvalue
);
454 if (handle
!= SEM_FAILED
)
456 mp_SetError(NULL
, MP_STANDARD_ERROR
);
461 semlock_rebuild(PyTypeObject
*type
, PyObject
*args
)
466 if (!PyArg_ParseTuple(args
, F_SEM_HANDLE
"ii",
467 &handle
, &kind
, &maxvalue
))
470 return newsemlockobject(type
, handle
, kind
, maxvalue
);
474 semlock_dealloc(SemLockObject
* self
)
476 if (self
->handle
!= SEM_FAILED
)
477 SEM_CLOSE(self
->handle
);
482 semlock_count(SemLockObject
*self
)
484 return PyInt_FromLong((long)self
->count
);
488 semlock_ismine(SemLockObject
*self
)
490 /* only makes sense for a lock */
491 return PyBool_FromLong(ISMINE(self
));
495 semlock_getvalue(SemLockObject
*self
)
497 #if HAVE_BROKEN_SEM_GETVALUE
498 PyErr_SetNone(PyExc_NotImplementedError
);
502 if (SEM_GETVALUE(self
->handle
, &sval
) < 0)
503 return mp_SetError(NULL
, MP_STANDARD_ERROR
);
504 /* some posix implementations use negative numbers to indicate
505 the number of waiting threads */
508 return PyInt_FromLong((long)sval
);
513 semlock_iszero(SemLockObject
*self
)
516 #if HAVE_BROKEN_SEM_GETVALUE
517 if (sem_trywait(self
->handle
) < 0) {
520 return mp_SetError(NULL
, MP_STANDARD_ERROR
);
522 if (sem_post(self
->handle
) < 0)
523 return mp_SetError(NULL
, MP_STANDARD_ERROR
);
527 if (SEM_GETVALUE(self
->handle
, &sval
) < 0)
528 return mp_SetError(NULL
, MP_STANDARD_ERROR
);
529 return PyBool_FromLong((long)sval
== 0);
534 semlock_afterfork(SemLockObject
*self
)
544 static PyMethodDef semlock_methods
[] = {
545 {"acquire", (PyCFunction
)semlock_acquire
, METH_VARARGS
| METH_KEYWORDS
,
546 "acquire the semaphore/lock"},
547 {"release", (PyCFunction
)semlock_release
, METH_NOARGS
,
548 "release the semaphore/lock"},
549 {"__enter__", (PyCFunction
)semlock_acquire
, METH_VARARGS
,
550 "enter the semaphore/lock"},
551 {"__exit__", (PyCFunction
)semlock_release
, METH_VARARGS
,
552 "exit the semaphore/lock"},
553 {"_count", (PyCFunction
)semlock_count
, METH_NOARGS
,
554 "num of `acquire()`s minus num of `release()`s for this process"},
555 {"_is_mine", (PyCFunction
)semlock_ismine
, METH_NOARGS
,
556 "whether the lock is owned by this thread"},
557 {"_get_value", (PyCFunction
)semlock_getvalue
, METH_NOARGS
,
558 "get the value of the semaphore"},
559 {"_is_zero", (PyCFunction
)semlock_iszero
, METH_NOARGS
,
560 "returns whether semaphore has value zero"},
561 {"_rebuild", (PyCFunction
)semlock_rebuild
, METH_VARARGS
| METH_CLASS
,
563 {"_after_fork", (PyCFunction
)semlock_afterfork
, METH_NOARGS
,
564 "rezero the net acquisition count after fork()"},
572 static PyMemberDef semlock_members
[] = {
573 {"handle", T_SEM_HANDLE
, offsetof(SemLockObject
, handle
), READONLY
,
575 {"kind", T_INT
, offsetof(SemLockObject
, kind
), READONLY
,
577 {"maxvalue", T_INT
, offsetof(SemLockObject
, maxvalue
), READONLY
,
586 PyTypeObject SemLockType
= {
587 PyVarObject_HEAD_INIT(NULL
, 0)
588 /* tp_name */ "_multiprocessing.SemLock",
589 /* tp_basicsize */ sizeof(SemLockObject
),
591 /* tp_dealloc */ (destructor
)semlock_dealloc
,
597 /* tp_as_number */ 0,
598 /* tp_as_sequence */ 0,
599 /* tp_as_mapping */ 0,
605 /* tp_as_buffer */ 0,
606 /* tp_flags */ Py_TPFLAGS_DEFAULT
| Py_TPFLAGS_BASETYPE
,
607 /* tp_doc */ "Semaphore/Mutex type",
610 /* tp_richcompare */ 0,
611 /* tp_weaklistoffset */ 0,
614 /* tp_methods */ semlock_methods
,
615 /* tp_members */ semlock_members
,
619 /* tp_descr_get */ 0,
620 /* tp_descr_set */ 0,
621 /* tp_dictoffset */ 0,
624 /* tp_new */ semlock_new
,