Checking in changes prior to tagging of version 2.73.
[MogileFS-Server.git] / lib / MogileFS / Server.pm
blob7520e0631867193748e46805be2dbc038f003b75
1 package MogileFS::Server;
2 use strict;
3 use warnings;
4 use vars qw($VERSION);
5 $VERSION = "2.73";
7 =head1 NAME
9 MogileFS::Server - MogileFS (distributed filesystem) server
11 =head1 SYNOPSIS
13 $s = MogileFS::Server->server;
14 $s->run;
16 =cut
18 use IO::Socket;
19 use Symbol;
20 use POSIX;
21 use File::Copy ();
22 use Carp;
23 use File::Basename ();
24 use File::Path ();
25 use Sys::Syslog ();
26 use Time::HiRes ();
27 use Net::Netmask;
28 use List::Util;
29 use Socket qw(SO_KEEPALIVE IPPROTO_TCP TCP_NODELAY);
31 use MogileFS::Util qw(daemonize);
32 use MogileFS::Config;
34 use MogileFS::ProcManager;
35 use MogileFS::Connection::Client;
36 use MogileFS::Connection::Worker;
38 use MogileFS::Worker::Query;
39 use MogileFS::Worker::Delete;
40 use MogileFS::Worker::Replicate;
41 use MogileFS::Worker::Reaper;
42 use MogileFS::Worker::Monitor;
43 use MogileFS::Worker::Fsck;
44 use MogileFS::Worker::JobMaster;
46 use MogileFS::Factory::Domain;
47 use MogileFS::Factory::Class;
48 use MogileFS::Factory::Host;
49 use MogileFS::Factory::Device;
50 use MogileFS::Domain;
51 use MogileFS::Class;
52 use MogileFS::Host;
53 use MogileFS::Device;
55 use MogileFS::HTTPFile;
56 use MogileFS::FID;
57 use MogileFS::DevFID;
59 use MogileFS::Store;
61 use MogileFS::ReplicationPolicy::MultipleHosts;
63 my $server; # server singleton
64 sub server {
65 my ($pkg) = @_;
66 return $server ||= bless {}, $pkg;
69 # --------------------------------------------------------------------------
70 # instance methods:
71 # --------------------------------------------------------------------------
73 sub run {
74 my $self = shift;
76 MogileFS::Config->load_config;
78 # don't run as root
79 die "mogilefsd cannot be run as root\n"
80 if $< == 0 && MogileFS->config('user') ne "root";
82 MogileFS::Config->check_database;
83 daemonize() if MogileFS->config("daemonize");
85 MogileFS::ProcManager->set_min_workers('monitor' => 1);
87 # open up our log
88 Sys::Syslog::openlog('mogilefsd', 'pid', 'daemon');
89 Mgd::log('info', 'beginning run');
91 unless (MogileFS::ProcManager->write_pidfile) {
92 Mgd::log('info', "Couldn't write pidfile, ending run");
93 Sys::Syslog::closelog();
94 exit 1;
97 # Install signal handlers.
98 $SIG{TERM} = sub {
99 my @children = MogileFS::ProcManager->child_pids;
100 print STDERR scalar @children, " children to kill.\n" if $DEBUG;
101 my $count = kill( 'TERM' => @children );
102 print STDERR "Sent SIGTERM to $count children.\n" if $DEBUG;
103 MogileFS::ProcManager->remove_pidfile;
104 Mgd::log('info', 'ending run due to SIGTERM');
105 Sys::Syslog::closelog();
107 exit 0;
110 $SIG{INT} = sub {
111 my @children = MogileFS::ProcManager->child_pids;
112 print STDERR scalar @children, " children to kill.\n" if $DEBUG;
113 my $count = kill( 'INT' => @children );
114 print STDERR "Sent SIGINT to $count children.\n" if $DEBUG;
115 MogileFS::ProcManager->remove_pidfile;
116 Mgd::log('info', 'ending run due to SIGINT');
117 exit 0;
119 $SIG{PIPE} = 'IGNORE'; # catch them by hand
121 # setup server sockets to listen for client connections
122 my @servers;
123 foreach my $listen (@{ MogileFS->config('listen') }) {
124 my $server = IO::Socket::INET->new(LocalAddr => $listen,
125 Type => SOCK_STREAM,
126 Proto => 'tcp',
127 Blocking => 0,
128 Reuse => 1,
129 Listen => 1024 )
130 or die "Error creating socket: $@\n";
131 $server->sockopt(SO_KEEPALIVE, 1);
132 $server->setsockopt(IPPROTO_TCP, TCP_NODELAY, 1);
134 # save sub to accept a client
135 push @servers, $server;
136 Danga::Socket->AddOtherFds( fileno($server) => sub {
137 while (my $csock = $server->accept) {
138 MogileFS::Connection::Client->new($csock);
140 } );
143 MogileFS::ProcManager->push_pre_fork_cleanup(sub {
144 # so children don't hold server connection open
145 close($_) foreach @servers;
148 # setup the post event loop callback to spawn jobs, and the timeout
149 Danga::Socket->DebugLevel(3);
150 Danga::Socket->SetLoopTimeout( 250 ); # 250 milliseconds
151 Danga::Socket->SetPostLoopCallback(MogileFS::ProcManager->PostEventLoopChecker);
153 # and now, actually start listening for events
154 eval {
155 print( "Starting event loop for frontend job on pid $$.\n" ) if $DEBUG;
156 Danga::Socket->EventLoop();
159 if ($@) {
160 Mgd::log('err', "crash log: $@");
161 exit 1;
163 Mgd::log('info', 'ending run');
164 Sys::Syslog::closelog();
165 exit(0);
168 # --------------------------------------------------------------------------
170 package MogileFS;
171 # just so MogileFS->config($key) will work:
172 use MogileFS::Config qw(config);
174 my %hooks;
176 sub register_worker_command {
177 # just pass this through to the Worker class
178 return MogileFS::Worker::Query::register_command(@_);
181 sub register_global_hook {
182 $hooks{$_[0]} = $_[1];
183 return 1;
186 sub unregister_global_hook {
187 delete $hooks{$_[0]};
188 return 1;
191 sub run_global_hook {
192 my $hookname = shift;
193 my $ref = $hooks{$hookname};
194 return $ref->(@_) if defined $ref;
195 return undef;
198 # --------------------------------------------------------------------------
200 package Mgd; # conveniently short name
201 use strict;
202 use warnings;
203 use MogileFS::Config;
204 use MogileFS::Util qw(error fatal debug); # for others calling Mgd::foo()
205 use Socket qw(SOL_SOCKET SO_RCVBUF AF_UNIX SOCK_STREAM PF_UNSPEC);
206 BEGIN {
207 # detect the receive buffer size for Unix domain stream sockets,
208 # we assume the size is identical across all Unix domain sockets.
209 socketpair(my $s1, my $s2, AF_UNIX, SOCK_STREAM, PF_UNSPEC)
210 or die( "socketpair failed: $!" );
212 my $r = getsockopt($s1, SOL_SOCKET, SO_RCVBUF);
213 defined $r or die "getsockopt: $!";
214 $r = unpack('i', $r) if defined $r;
215 $r = (defined $r && $r > 0) ? $r : 8192;
216 close $s1;
217 close $s2;
218 eval 'use constant UNIX_RCVBUF_SIZE => $r';
221 sub server {
222 return MogileFS::Server->server;
225 # database checking/connecting
226 sub validate_dbh {
227 my $sto = Mgd::get_store();
228 my $had_dbh = $sto->have_dbh;
229 $sto->recheck_dbh();
230 my $dbh;
231 eval { $dbh = $sto->dbh };
232 # Doesn't matter what the failure was; workers should retry later.
233 error("Error validating master DB: $@") if $@ && $had_dbh;
234 return $dbh;
237 # the eventual replacement for callers asking for a dbh directly:
238 # they'll ask for the current store, which is a database abstraction
239 # layer.
240 my ($store, $store_pid);
241 sub get_store {
242 return $store if $store && $store_pid == $$;
243 $store_pid = $$;
244 return $store = MogileFS::Store->new;
247 sub close_store {
248 if ($store) {
249 $store->dbh->disconnect();
250 $store = undef;
251 return 1;
253 return 0;
256 # only for t/ scripts to explicitly set a store, without loading in a config
257 sub set_store {
258 my ($s) = @_;
259 $store = $s;
260 $store_pid = $$;
263 sub domain_factory {
264 return MogileFS::Factory::Domain->get_factory;
267 sub class_factory {
268 return MogileFS::Factory::Class->get_factory;
271 sub host_factory {
272 return MogileFS::Factory::Host->get_factory;
275 sub device_factory {
276 return MogileFS::Factory::Device->get_factory;
279 # log stuff to syslog or the screen
280 sub log {
281 # simple logging functionality
282 if (! $MogileFS::Config::daemonize) {
283 $| = 1;
284 # syslog acts like printf so we have to use printf and append a \n
285 shift; # ignore the first parameter (info, warn, critical, etc)
286 my $mask = shift; # format string
287 $mask .= "\n" unless $mask =~ /\n$/;
288 my $message = @_ ? sprintf($mask, @_) : $mask;
289 print '[', scalar localtime(), '] ', $message;
290 } else {
291 # just pass the parameters to syslog
292 Sys::Syslog::syslog(@_);
297 __END__
298 #Just for MakeMaker's kinda lame regexp for ABSTRACT_FROM
299 =dummypod
300 mogilefs::server - MogileFS (distributed filesystem) server.