1 package IPC
::Semaphore
::Concurrency
;
8 use POSIX
qw(O_WRONLY O_CREAT O_NONBLOCK O_NOCTTY);
9 use IPC
::SysV
qw(ftok IPC_NOWAIT IPC_CREAT IPC_EXCL S_IRUSR S_IWUSR S_IRGRP S_IWGRP S_IROTH S_IWOTH SEM_UNDO);
13 our @ISA = qw(Exporter);
14 our %EXPORT_TAGS = ( 'all' => [ qw() ] );
15 our @EXPORT_OK = ( @
{ $EXPORT_TAGS{'all'} } );
18 our $VERSION = '0.02';
25 # Only one required argument
26 $args{'path'} = shift;
31 if (!exists($args{'path'})) {
32 carp
"Must supply a path!"; #TODO: Allow private semaphores
36 $args{'project'} = 0 if (!exists($args{'project'}));
37 $args{'count'} = 1 if (!exists($args{'count'}));
38 $args{'value'} = 1 if (!exists($args{'value'})); # TODO: allow array (one value per semaphore)
39 $args{'touch'} = 1 if (!exists($args{'touch'}));
41 my $self = bless {}, $class;
42 $self->{'_args'} = { %args };
44 $self->_touch($self->{'_args'}->{'path'}) if (!-e
$self->{'_args'}->{'path'} || $self->{'_args'}->{'touch'}) or return undef;
45 $self->{'_args'}->{'key'} = $self->_ftok() or return undef;
47 $self->{'_args'}->{'sem'} = $self->_create($self->key()) or return undef;
54 # Create and/or touch the path, returns false if there's an error
57 sysopen(my $fh, $path, O_WRONLY
|O_CREAT
|O_NONBLOCK
|O_NOCTTY
) or carp
"Can't create $path: $!" and return 0;
58 utime(undef, undef, $path) if ($self->{'_args'}->{'touch'});
59 close $fh or carp
"Can't close $path: $!" and return 0;
64 # Create an IPC key, returns result of ftok()
66 return ftok
($self->{'_args'}->{'path'}, $self->{'_args'}->{'project'}) or carp
"Can't create semaphore key: $!" and return undef;
70 # Create the semaphore and assign it its initial value
73 # Presubably the semaphore exists already, so try using it right away
74 my $sem = IPC
::Semaphore
->new($key, 0, 0);
76 # Creatie a new semaphore...
77 $sem = IPC
::Semaphore
->new($key, $self->{'_args'}->{'count'}, IPC_CREAT
|IPC_EXCL
|S_IRUSR
|S_IWUSR
|S_IRGRP
|S_IWGRP
|S_IROTH
|S_IWOTH
);
79 # Make sure another process did not create it in our back
80 $sem = IPC
::Semaphore
->new($key, 0, 0) or carp
"Semaphore creation failed!\n";
82 # If we created the semaphore now we assign its initial value
83 for (my $i=0; $i<$self->{'_args'}->{'count'}; $i++) { # TODO: Support array - see above
84 $sem->op($i, $self->{'_args'}->{'value'}, 0);
88 # Return whatever last semget call got us
96 return $self->{'_args'}->{'sem'}->getall();
101 my $nsem = shift or 0;
102 return $self->{'_args'}->{'sem'}->getval($nsem);
107 my $nsem = shift or 0;
108 return $self->{'_args'}->{'sem'}->getncnt($nsem);
113 return $self->{'_args'}->{'sem'}->setall(@_);
118 my ($nsem, $val) = @_;
119 return $self->{'_args'}->{'sem'}->setval($nsem, $val);
124 return $self->{'_args'}->{'sem'}->stat();
129 return $self->{'_args'}->{'sem'}->id();
134 return $self->{'_args'}->{'key'};
141 if (@_ >= 1 && $_[0] =~ /^\d+$/) {
142 # Positional arguments
143 ($args{'sem'}, $args{'wait'}, $args{'max'}, $args{'undo'}) = @_;
148 $args{'sem'} = 0 if (!defined($args{'sem'}));
149 $args{'wait'} = 0 if (!defined($args{'wait'}));
150 $args{'max'} = -1 if (!defined($args{'max'}));
151 $args{'undo'} = 1 if (!defined($args{'undo'}));
153 my $sem = $self->{'_args'}->{'sem'};
154 my $flags = IPC_NOWAIT
;
155 $flags |= SEM_UNDO
if ($args{'undo'});
158 # Get blocked process count here to retain Errno (thus $!) after the first semop call.
159 $ncnt = $self->getncnt($args{'sem'}) if ($args{'wait'});
161 if (($ret = $sem->op($args{'sem'}, -1, $flags))) {
163 } elsif ($args{'wait'}) {
164 return $ret if ($args{'max'} >= 0 && $ncnt >= $args{'max'});
165 # Remove NOWAIT and block
166 $flags ^= IPC_NOWAIT
;
167 return $sem->op($args{'sem'}, -1, $flags);
174 my $number = shift || 0;
175 return $self->{'_args'}->{'sem'}->op($number, 1, 0);
180 return $self->{'_args'}->{'sem'}->remove();
188 IPC::Semaphore::Concurrency - Concurrency guard using semaphores
192 use IPC::Semaphore::Concurrency;
194 my $c = IPC::Semaphore::Concurrency->new('/tmp/sem_file');
199 print "Pass our turn\n";
203 my $c = IPC::Semaphore::Concurrency->new(
204 path => /tmp/sem_file,
209 if ($c->acquire(0, 1, 0)) {
212 print "Error: Another process is already locked\n";
215 if ($c->acquire(1)) {
216 print "Do other work\n";
221 This module allows you to limit concurrency of specific portions of your
222 code. It can be used to limit resource usage or to give exclusive access to
225 This module is similar in functionality to IPC::Concurrency with the main
226 differences being that is uses SysV Semaphores, and allow queuing up
227 processes while others hold the semaphore. There are other difference which
228 gives more flexibility in some cases.
230 Generally, errors messages on failures can be retriever with $!.
234 None for now (could change before first Beta)
238 IPC::Semaphore::Concurrency->new( $path );
240 IPC::Semaphore::Concurrency->new(
252 The path to combine with the project id for creating the semaphore key.
253 This file is only used for the inode and device numbers. Will be created
258 The project_id used for generating the key. If nothing else, the
259 semaphore value can be used as changing the count will force generating a
260 new semaphore. Defaults to 0.
264 Number of semaphores to create. Default is 1.
268 Value assigned to the semaphore at creation time. Default is 1.
272 If true, tough the path when creating the semaphore. This can be used to
273 ensure a file in /tmp do not get removed because it is too old.
295 These functions are wrapper of the same functions in IPC::Semaphore.
297 For getval and getncnt, if no argument is given the default is 0.
303 Return the key used to create the semaphore.
309 $c->acquire($sem_number, $wait, $max, $undo);
318 Acquire a semaphore lock. Return true if the lock was acquired.
324 The semaphore number to get. Defaults to 0.
328 If true, block on semaphore acquisition.
332 If C<wait> is true, don't block if b<max> processes or more are waiting
333 for the semaphore. Defaults to -1 (unlimited).
335 You may want to set it to some decent value if blocking on the semaphore
336 to ensure processes don't add up infinitely.
340 If defined and false, the semaphore won't be released automatically when
341 process exits. You can manually release the semaphore with $c->release().
343 Use with caution as you can block semaphore slots if the process crash or
344 gets killed. If used together with C<wait> blocked process could
345 eventually stack up leading to resources exhaustion.
353 $c->release($sem_number);
355 Useful only if you turn off the C<undo> option in C<acquire> function;
356 increment the semaphore by one.
360 =head3 Allow private semaphores
362 =head3 Allow passing an array of values
366 semop(3) and semop(3p) man pages both indicate that C<errno> should be set to
367 C<EAGAIN> if the call would block and C<IPC_NOWAIT> is used, yet in my tests
368 under Linux C<errno> was set to C<EWOULDBLOCK>. See C<example.pl> and
369 C<example2.pl> for examples of paranoiac error checking. YMMV.
371 Please report bugs to C<tguyot@gmail.com>.
375 L<IPC::Semaphore> - The module this is based on.
377 The code repository is mirrored on
378 L<http://repo.or.cz/w/IPC-Semaphore-Concurrency.git>
382 Thomas Guyot-Sionnest <tguyot@gmail.com>
384 =head1 COPYRIGHT AND LICENSE
386 Copyright (C) 2009 Thomas Guyot-Sionnest <tguyot@gmail.com>
388 This library is free software; you can redistribute it and/or modify
389 it under the same terms as Perl itself, either Perl version 5.8.8 or,
390 at your option, any later version of Perl 5 you may have available.