Don't run workers until Monitor runs
[MogileFS-Server.git] / lib / MogileFS / Connection / Client.pm
blobbf58222c6bb47ab49c9237fc7272925fc0d862b1
1 # A client is a user connection for sending requests to us. Requests
2 # can either be normal user requests to be sent to a QueryWorker
3 # or management requests that start with a !.
5 package MogileFS::Connection::Client;
7 use strict;
8 use Danga::Socket ();
9 use base qw{Danga::Socket};
11 use fields qw{read_buf};
13 sub new {
14 my $self = shift;
15 $self = fields::new($self) unless ref $self;
16 $self->SUPER::new( @_ );
17 $self->watch_read(1);
18 return $self;
21 # Client
22 sub event_read {
23 my MogileFS::Connection::Client $self = shift;
25 my $bref = $self->read(1024);
26 return $self->close unless defined $bref;
27 $self->{read_buf} .= $$bref;
29 while ($self->{read_buf} =~ s/^(.*?)\r?\n//) {
30 next unless length $1;
31 $self->handle_request($1);
35 sub handle_request {
36 my ($self, $line) = @_;
38 # if it's just 'help', 'h', '?', or something, do that
39 #if ((substr($line, 0, 1) eq '?') || ($line eq 'help')) {
40 # MogileFS::ProcManager->SendHelp($_[1]);
41 # return;
44 if ($line =~ /^!(\S+)(?:\s+(.+))?$/) {
45 my ($cmd, $args) = ($1, $2);
46 return $self->handle_admin_command($cmd, $args);
49 MogileFS::ProcManager->EnqueueCommandRequest($line, $self);
52 sub handle_admin_command {
53 my ($self, $cmd, $args) = @_;
55 my @out;
56 if ($cmd =~ /^stats$/) {
57 # print out some stats on the queues
58 my $uptime = time() - MogileFS::ProcManager->server_starttime;
59 my $ccount = MogileFS::ProcManager->PendingQueryCount;
60 my $wcount = MogileFS::ProcManager->BoredQueryWorkerCount;
61 my $ipcount = MogileFS::ProcManager->QueriesInProgressCount;
62 my $stats = MogileFS::ProcManager->StatsHash;
63 push @out, "uptime $uptime",
64 "pending_queries $ccount",
65 "processing_queries $ipcount",
66 "bored_queryworkers $wcount",
67 map { "$_ $stats->{$_}" } sort keys %$stats;
69 } elsif ($cmd =~ /^shutdown/) {
70 print "User requested shutdown: $args\n";
71 kill 15, $$; # kill us, that kills our kids
73 } elsif ($cmd =~ /^jobs/) {
74 # dump out a list of running jobs and pids
75 MogileFS::ProcManager->foreach_job(sub {
76 my ($job, $ct, $desired, $pidlist) = @_;
77 push @out, "$job count $ct";
78 push @out, "$job desired $desired";
79 push @out, "$job pids " . join(' ', @$pidlist);
80 });
82 } elsif ($cmd =~ /^want/) {
83 # !want <count> <jobclass>
84 # set the new desired staffing level for a class
85 if ($args =~ /^(\d+)\s+(\S+)/) {
86 my ($count, $job) = ($1, $2);
88 $count = 500 if $count > 500;
90 # now make sure it's a real job
91 if (MogileFS::ProcManager->is_monitor_good) {
92 if (MogileFS::ProcManager->is_valid_job($job)) {
93 MogileFS::ProcManager->request_job_process($job, $count);
94 push @out, "Now desiring $count children doing '$job'.";
95 } else {
96 my $classes = join(", ", MogileFS::ProcManager->valid_jobs);
97 push @out, "ERROR: Invalid class '$job'. Valid classes: $classes";
99 } else {
100 push @out, "ERROR: Monitor has not completed initial run yet\n";
102 } else {
103 push @out, "ERROR: usage: !want <count> <jobclass>";
106 } elsif ($cmd =~ /^to/) {
107 # !to <jobclass> <message>
108 # sends <message> to all children of <jobclass>
109 if ($args =~ /^(\S+)\s+(.+)/) {
110 my $ct = MogileFS::ProcManager->ImmediateSendToChildrenByJob($1, $2);
111 push @out, "Message sent to $ct children.";
113 } else {
114 push @out, "ERROR: usage: !to <jobclass> <message>";
117 } elsif ($cmd =~ /^queue/ || $cmd =~ /^pend/) {
118 MogileFS::ProcManager->foreach_pending_query(sub {
119 my ($client, $query) = @_;
120 push @out, $query;
123 } elsif ($cmd =~ /^watch/) {
124 if (MogileFS::ProcManager->RemoveErrorWatcher($self)) {
125 push @out, "Removed you from watcher list.";
126 } else {
127 MogileFS::ProcManager->AddErrorWatcher($self);
128 push @out, "Added you to watcher list.";
131 } elsif ($cmd =~ /^recent/) {
132 # show the most recent N queries
133 push @out, MogileFS::ProcManager->RecentQueries;
135 } elsif ($cmd =~ /^version/) {
136 # show the most recent N queries
137 push @out, $MogileFS::Server::VERSION;
139 } else {
140 MogileFS::ProcManager->SendHelp($self, $args);
143 $self->write(join("\r\n", @out) . "\r\n") if @out;
144 $self->write(".\r\n");
145 return;
148 # Client
149 sub event_err { my $self = shift; $self->close; }
150 sub event_hup { my $self = shift; $self->close; }
152 # just note that we've died
153 sub close {
154 # mark us as being dead
155 my $self = shift;
156 MogileFS::ProcManager->NoteDeadClient($self);
157 $self->SUPER::close(@_);
162 # Local Variables:
163 # mode: perl
164 # c-basic-indent: 4
165 # indent-tabs-mode: nil
166 # End: