1 package MogileFS
::Store
::Postgres
;
2 # vim: ts=4 sw=4 et ft=perl:
4 use Digest
::MD5
qw(md5); # Used for lockid
8 use MogileFS
::Util
qw(throw debug error);
11 use base
'MogileFS::Store';
13 # --------------------------------------------------------------------------
14 # Package methods we override
15 # --------------------------------------------------------------------------
18 my ($class, $dbname, $host, $port) = @_;
19 return "DBI:Pg:dbname=$dbname;host=$host" . ($port ?
";port=$port" : "");
23 my ($class, $dbname, $host, $port) = @_;
24 return $class->dsn_of_dbhost('postgres', $host, $port);
27 # --------------------------------------------------------------------------
28 # Store-related things we override
29 # --------------------------------------------------------------------------
31 sub want_raise_errors
{ 1 }
33 # given a root DBI connection, create the named database. succeed
34 # if it it's made, or already exists. die otherwise.
35 sub create_db_if_not_exists
{
36 my ($pkg, $rdbh, $dbname) = @_;
37 if(not $rdbh->do("CREATE DATABASE $dbname TEMPLATE template0 ENCODING 'UTF-8'" )) {
38 die "Failed to create database '$dbname': " . $rdbh->errstr . "\n" if ($rdbh->errstr !~ /already exists/);
42 sub grant_privileges
{
43 my ($pkg, $rdbh, $dbname, $user, $pass) = @_;
45 $rdbh->do("CREATE ROLE $user LOGIN PASSWORD ?",
48 die "Failed to create user '$user': ". $rdbh->errstr . "\n"
49 if $rdbh->err && $rdbh->state != '42710';
50 # Owning the database is postgres is important
51 $rdbh->do("ALTER DATABASE $dbname OWNER TO $user")
52 or die "Failed to grant privileges " . $rdbh->errstr . "\n";
56 sub can_insertignore
{ 0 }
57 sub can_insert_multi
{ 0 }
58 sub unix_timestamp
{ "EXTRACT(epoch FROM NOW())::int4" }
63 my $database_version = $self->dbh->get_info(18); # SQL_DBMS_VER
64 # We need >=pg-8.2 because we use SAVEPOINT and ROLLBACK TO.
65 die "Postgres is too old! Must use >=postgresql-8.2!" if($database_version =~ /\A0[0-7]\.|08\.0[01]/);
66 $self->{lock_depth
} = 0;
69 sub post_dbi_connect
{
71 $self->SUPER::post_dbi_connect
;
72 $self->{lock_depth
} = 0;
75 sub can_do_slaves
{ 0 }
77 # TODO: Implement later
81 sub was_deadlock_error
{
84 return 0 unless $dbh->err;
85 return 1 if $dbh->state eq '40P01';
88 sub was_duplicate_error
{
91 return 0 unless $dbh->err;
92 return 1 if $dbh->state eq '23505' || $dbh->errstr =~ /duplicate/i;
96 my ($self, $table) = @_;
98 my $sth = $self->dbh->table_info(undef, undef, $table, "table");
99 my $rec = $sth->fetchrow_hashref;
106 $self->add_extra_tables('lock');
107 return $self->SUPER::setup_database
;
110 sub filter_create_sql
{
111 my ($self, $sql) = @_;
112 $sql =~ s/\bUNSIGNED\b//g;
113 $sql =~ s/\b(?:TINY|MEDIUM)INT\b/SMALLINT/g;
114 $sql =~ s/\bINT\s+NOT\s+NULL\s+AUTO_INCREMENT\b/SERIAL/g;
117 my ($table) = $sql =~ /create\s+table\s+(\S+)/i;
118 die "didn't find table" unless $table;
119 my $index = sprintf 'INDEXES_%s', $table;
120 if ($self->can($index)) {
121 $sql =~ s!,\s*INDEX\s*(\w+)?\s*\(.+?\)!!mgi;
124 # Allow 64-bit ids for file IDs
125 $sql =~ s!\bfid\s+INT\b!fid BIGINT!i if $self->fid_type eq "BIGINT";
135 dmid SMALLINT NOT NULL,
136 dkey VARCHAR(255), -- domain-defined
139 length BIGINT, -- big limit
142 classid SMALLINT NOT NULL,
143 devcount SMALLINT NOT NULL
148 "CREATE INDEX file_devcount ON file (dmid,classid,devcount)"
151 sub INDEXES_unreachable_fids
{
152 "CREATE INDEX unreachable_fids_lastupdate ON unreachable_fids (lastupdate)"
155 sub INDEXES_file_on
{
156 "CREATE INDEX file_on_devid ON file_on (devid)"
161 hostid SMALLINT NOT NULL,
162 PRIMARY KEY (hostid),
166 CHECK (status IN ('alive','dead','down')),
168 http_port INT DEFAULT 7500,
169 CHECK (http_port >= 0),
170 CHECK (http_port < 65536),
173 CHECK (http_get_port >= 0),
174 CHECK (http_get_port < 65536),
176 hostname VARCHAR(40),
187 "CREATE TABLE device (
188 devid SMALLINT NOT NULL,
192 hostid SMALLINT NOT NULL,
195 CHECK (status IN ('alive','dead','down','readonly','drain')),
196 weight INT DEFAULT 100,
199 CHECK (mb_total >= 0),
201 CHECK (mb_used >= 0),
208 "CREATE INDEX device_status ON device (status)"
211 sub INDEXES_file_to_replicate
{
212 "CREATE INDEX file_to_replicate_nexttry ON file_to_replicate (nexttry)"
215 sub INDEXES_file_to_delete2
{
216 "CREATE INDEX file_to_delete2_nexttry ON file_to_delete2 (nexttry)"
219 sub INDEXES_file_to_delete_later
{
220 "CREATE INDEX file_to_delete_later_delafter ON file_to_delete_later (delafter)"
223 sub INDEXES_fsck_log
{
224 "CREATE INDEX fsck_log_utime ON fsck_log (utime)"
227 sub INDEXES_file_to_queue
{
228 "CREATE INDEX type_nexttry ON file_to_queue (type,nexttry)"
235 PRIMARY KEY (lockid),
238 hostname VARCHAR(255) NOT NULL,
243 acquiredat INT NOT NULL,
244 CHECK (acquiredat >= 0)
248 sub upgrade_add_host_getport
{
250 # see if they have the get port, else update it
251 unless ($self->column_type("host", "http_get_port")) {
252 $self->dowell("ALTER TABLE host ADD COLUMN http_get_port INT CHECK(http_get_port >= 0)");
256 sub upgrade_add_host_altip
{
258 unless ($self->column_type("host", "altip")) {
259 $self->dowell("ALTER TABLE host ADD COLUMN altip VARCHAR(15)");
260 $self->dowell("ALTER TABLE host ADD COLUMN altmask VARCHAR(18)");
261 $self->dowell("ALTER TABLE host ADD UNIQUE altip (altip)");
265 sub upgrade_add_device_asof
{
267 unless ($self->column_type("device", "mb_asof")) {
268 $self->dowell("ALTER TABLE device ADD COLUMN mb_asof INT CHECK(mb_asof >= 0)");
272 sub upgrade_add_device_weight
{
274 unless ($self->column_type("device", "weight")) {
275 $self->dowell("ALTER TABLE device ADD COLUMN weight INT DEFAULT 100");
279 sub upgrade_add_device_readonly
{
281 unless ($self->column_constraint("device", "status") =~ /readonly/) {
282 $self->dowell("ALTER TABLE device MODIFY COLUMN status VARCHAR(8) CHECK(status IN ('alive', 'dead', 'down', 'readonly'))");
286 sub upgrade_add_device_drain
{
288 unless ($self->column_constraint("device", "status") =~ /drain/) {
289 $self->dowell("ALTER TABLE device MODIFY COLUMN status VARCHAR(8) CHECK(status IN ('alive', 'dead', 'down', 'readonly','drain'))");
293 sub upgrade_modify_server_settings_value
{
295 unless ($self->column_type("server_settings", "value" =~ /text/i)) {
296 $self->dowell("ALTER TABLE server_settings ALTER COLUMN value TYPE TEXT");
300 sub upgrade_add_file_to_queue_arg
{
302 unless ($self->column_type("file_to_queue", "arg")) {
303 $self->dowell("ALTER TABLE file_to_queue ADD COLUMN arg TEXT");
307 # return 1 on success. die otherwise.
308 sub enqueue_fids_to_delete
{
309 # My kingdom for a real INSERT IGNORE implementation!
310 my ($self, @fidids) = @_;
311 my $sql = "INSERT INTO file_to_delete (fid) VALUES (?)";
313 foreach my $fidid (@fidids) {
314 $self->dbh->begin_work;
317 $self->dbh->do($sql, undef, $fidid);
319 if ($@
|| $self->dbh->err) {
320 if ($self->was_duplicate_error) {
331 sub enqueue_fids_to_delete2
{
332 # My kingdom for a real REPLACE implementation!
333 my ($self, @fidids) = @_;
334 my $tbl = 'file_to_delete2';
335 my $sql1 = sprintf "INSERT INTO %s (fid, nexttry) VALUES (?,%s)", $tbl, $self->unix_timestamp;
338 foreach my $fidid (@fidids) {
339 $self->dbh->begin_work;
342 $self->dbh->do($sql1, undef, $fidid);
344 if ($@
|| $self->dbh->err) {
345 if ($self->was_duplicate_error) {
346 push @dup_fids, $fidid;
354 my $sql2 = sprintf 'UPDATE %s SET nexttry = %s WHERE fid IN (?)', $tbl, $self->unix_timestamp;
356 foreach my $fidid (@dup_fids) {
357 $self->dbh->begin_work;
360 $self->dbh->do($sql2, undef, $fidid);
362 if ($@
|| $self->dbh->err) {
363 if ($self->was_duplicate_error) {
364 # Ignore, no need of it
374 # --------------------------------------------------------------------------
375 # Functions specific to Store::Postgres subclass. Not in parent.
376 # --------------------------------------------------------------------------
378 sub insert_or_ignore
{
380 my %arg = $self->_valid_params([qw(insert insert_vals)], @_);
381 return $self->insert_or_update(
382 insert
=> $arg{insert
},
383 insert_vals
=> $arg{insert_vals
},
385 update_vals
=> 'IGNORE',
389 sub insert_or_update
{
391 my %arg = $self->_valid_params([qw(insert update insert_vals update_vals)], @_);
392 my $dbh = $self->dbh;
393 my $savepoint_name = $arg{insert
};
394 $savepoint_name =~ s/^INSERT INTO ([^\s]+).*$/$1/g;
397 $dbh->do('SAVEPOINT '.$savepoint_name);
399 $dbh->do($arg{insert
}, undef, @
{ $arg{insert_vals
} });
401 if ($@
|| $dbh->err) {
402 if ($self->was_duplicate_error) {
403 $dbh->do('ROLLBACK TO '.$savepoint_name);
404 if($arg{update
} ne "IGNORE") {
405 $dbh->do($arg{update
}, undef, @
{ $arg{update_vals
} });
416 my ($self, $table, $col) = @_;
417 my $sth = $self->dbh->prepare("SELECT column_name,data_type FROM information_schema.columns WHERE table_name=? AND column_name=?");
418 $sth->execute($table,$col);
419 while (my $rec = $sth->fetchrow_hashref) {
420 if ($rec->{column_name
} eq $col) {
422 return $rec->{data_type
};
428 sub column_constraint
{
429 my ($self, $table, $col) = @_;
430 my $sth = $self->dbh->prepare("SELECT column_name,information_schema.check_constraints.check_clause FROM information_schema.constraint_column_usage JOIN information_schema.check_constraints USING(constraint_catalog,constraint_schema,constraint_name) WHERE table_name=? AND column_name=?");
431 $sth->execute($table,$col);
432 while (my $rec = $sth->fetchrow_hashref) {
433 if ($rec->{column_name
} eq $col) {
435 return $rec->{check_clause
};
443 return $self->{_fid_type
} if $self->{_fid_type
};
445 # let people force bigint mode with environment.
446 if ($ENV{MOG_FIDSIZE
} && $ENV{MOG_FIDSIZE
} eq "big") {
447 return $self->{_fid_type
} = "BIGINT";
450 # else, check a maybe-existing table and see if we're in bigint
452 my $dbh = $self->dbh;
453 my $file_fid_type = $self->column_type("file", "fid");
455 if ($file_fid_type =~ /bigint/i) {
456 return $self->{_fid_type
} = "BIGINT";
457 } elsif($file_fid_type =~ /int/i) {
458 # Old installs might not have raised the fid type size yet.
459 return $self->{_fid_type
} = "INT";
463 # Used to default to 32bit ints, but this always bites people
464 # a few years down the road. So default to 64bit.
465 return $self->{_fid_type
} = "BIGINT";
468 # --------------------------------------------------------------------------
469 # Test suite things we override
470 # --------------------------------------------------------------------------
475 my $dbname = $args{dbname
} || "tmp_mogiletest";
476 my $host = $args{dbhost
} || 'localhost';
477 my $port = $args{dbport
} || 5432;
478 my $user = $args{dbuser
} || 'mogile';
479 my $pass = $args{dbpass
} || '';
480 my $rootuser = $args{dbrootuser
} || $args{dbuser
} || 'postgres';
481 my $rootpass = $args{dbrootpass
} || $args{dbpass
} || '';
482 _drop_db
($dbname,$host,$port,$rootuser,$rootpass);
484 my @args = ( "$FindBin::Bin/../mogdbsetup", "--yes",
485 "--dbname=$dbname", "--type=Postgres",
486 "--dbhost=$host", "--dbport=$port",
488 "--dbrootuser=$rootuser", );
489 push @args, "--dbpass=$pass" unless $pass eq '';
490 push @args, "--dbrootpass=$rootpass" unless $rootpass eq '';
492 and die "Failed to run mogdbsetup (".join(' ',map { "'".$_."'" } @args).").";
494 return MogileFS
::Store
->new_from_dsn_user_pass("dbi:Pg:dbname=$dbname;host=$host;port=$port",
503 my $rootuser = shift;
504 my $rootpass = shift;
505 return $rootdbh ||= DBI
->connect("DBI:Pg:dbname=postgres;host=$host;port=$port", $rootuser, $rootpass, { RaiseError
=> 1 })
506 or die "Couldn't connect to local PostgreSQL database as $rootuser";
513 my $rootuser = shift;
514 my $rootpass = shift;
515 my $root_dbh = _root_dbh
($host, $port, $rootuser, $rootpass);
517 $root_dbh->do("DROP DATABASE $dbname");
522 # --------------------------------------------------------------------------
523 # Data-access things we override
524 # --------------------------------------------------------------------------
526 # return new classid on success (non-zero integer), die on failure
527 # throw 'dup' on duplicate name
528 # TODO: add locks around entire table
530 my ($self, $dmid, $classname) = @_;
531 my $dbh = $self->dbh;
533 # get the max class id in this domain
534 my $maxid = $dbh->selectrow_array
535 ('SELECT MAX(classid) FROM class WHERE dmid = ?', undef, $dmid) || 0;
537 # now insert the new class
539 $dbh->do("INSERT INTO class (dmid, classid, classname, mindevcount) VALUES (?, ?, ?, ?)",
540 undef, $dmid, $maxid + 1, $classname, 2);
542 if ($@
|| $dbh->err) {
543 # first is error code for duplicates
544 if ($self->was_duplicate_error) {
548 return $maxid + 1 if $rv;
553 # returns 1 on success, 0 on duplicate key error, dies on exception
554 # TODO: need a test to hit the duplicate name error condition
556 my ($self, $fidid, $to_key) = @_;
557 my $dbh = $self->dbh;
559 $dbh->do('UPDATE file SET dkey = ? WHERE fid=?',
560 undef, $to_key, $fidid);
562 if ($@
|| $dbh->err) {
563 # first is error code for duplicates
564 if ($self->was_duplicate_error) {
574 # add a record of fidid existing on devid
575 # returns 1 on success, 0 on duplicate
576 sub add_fidid_to_devid
{
577 my ($self, $fidid, $devid) = @_;
578 my $dbh = $self->dbh;
580 $dbh->do("INSERT INTO file_on (fid, devid) VALUES (?, ?)", undef, $fidid, $devid);
583 return 1 if !$@
&& !$dbh->err;
587 # update the device count for a given fidid
588 sub update_devcount_atomic
{
589 my ($self, $fidid) = @_;
592 $self->dbh->begin_work;
593 $rv = $self->dbh->do("SELECT devcount FROM file WHERE fid=? FOR UPDATE", undef, $fidid);
596 $self->dbh->rollback;
599 $rv = $self->dbh->do("UPDATE file SET devcount=(SELECT COUNT(devid) FROM file_on WHERE fid=?) WHERE fid=?", undef, $fidid, $fidid);
606 sub should_begin_replicating_fidid
{
607 my ($self, $fidid) = @_;
608 my $lockname = "mgfs:fid:$fidid:replicate";
609 return 1 if $self->get_lock($lockname, 1);
613 sub note_done_replicating
{
614 my ($self, $fidid) = @_;
615 my $lockname = "mgfs:fid:$fidid:replicate";
616 $self->release_lock($lockname);
619 # enqueue a fidid for replication, from a specific deviceid (can be undef), in a given number of seconds.
620 sub enqueue_for_replication
{
621 my ($self, $fidid, $from_devid, $in) = @_;
622 my $dbh = $self->dbh;
626 $nexttry = $self->unix_timestamp." + ${in}::int";
630 $dbh->do("INSERT INTO file_to_replicate (fid, fromdevid, nexttry) VALUES (?, ?, $nexttry)",
631 undef, $fidid, $from_devid);
635 # reschedule all deferred replication, return number rescheduled
638 return $self->dbh->do("UPDATE file_to_replicate SET nexttry = ".$self->unix_timestamp." WHERE nexttry > ".$self->unix_timestamp);
641 sub reschedule_file_to_replicate_relative
{
642 my ($self, $fid, $in_n_secs) = @_;
643 $self->dbh->do("UPDATE file_to_replicate SET nexttry = ".$self->unix_timestamp." + ?, failcount = failcount + 1 WHERE fid = ?",
644 undef, $in_n_secs, $fid);
647 # creates a new domain, given a domain namespace string. return the dmid on success,
648 # throw 'dup' on duplicate name.
650 my ($self, $name) = @_;
651 my $dbh = $self->dbh;
653 # get the max domain id
654 my $maxid = $dbh->selectrow_array('SELECT MAX(dmid) FROM domain') || 0;
656 $dbh->do('INSERT INTO domain (dmid, namespace) VALUES (?, ?)',
657 undef, $maxid + 1, $name);
659 if ($self->was_duplicate_error) {
662 return $maxid+1 if $rv;
663 die "failed to make domain"; # FIXME: the above is racy.
666 sub set_server_setting
{
667 my ($self, $key, $val) = @_;
668 my $dbh = $self->dbh;
671 $self->insert_or_update(
672 insert
=> "INSERT INTO server_settings (field, value) VALUES (?, ?)",
673 insert_vals
=> [ $key, $val ],
674 update
=> "UPDATE server_settings SET value = ? WHERE field = ?",
675 update_vals
=> [ $val, $key ],
678 $dbh->do("DELETE FROM server_settings WHERE field=?", undef, $key);
681 die "Error updating 'server_settings': " . $dbh->errstr if $dbh->err;
685 # This implementation is race-safe
686 sub incr_server_setting
{
687 my ($self, $key, $val) = @_;
688 $val = 1 unless defined $val;
691 $self->dbh->begin_work;
692 my $value = $self->dbh->selectrow_array("SELECT value FROM server_settings WHERE field=? FOR UPDATE",undef,$key);
694 if($value =~ /^\d+$/) {
697 warning
("Wanted to incr_server_setting by $val on field=$key but old value was $value. Setting instead.");
700 my $rv = $self->dbh->do("UPDATE server_settings ".
702 "WHERE field=?", undef,
707 $self->dbh->rollback; # Release the row-lock
708 $self->set_server_setting($key, $val);
711 # return 1 on success, throw "dup" on duplicate devid or throws other error on failure
713 my ($self, $devid, $hostid, $status) = @_;
714 my $rv = $self->conddup(sub {
715 $self->dbh->do("INSERT INTO device (devid, hostid, status) VALUES (?, ?, ?)", undef,
716 $devid, $hostid, $status);
719 die "error making device $devid\n" unless $rv > 0;
723 sub mark_fidid_unreachable
{
724 my ($self, $fidid) = @_;
725 my $dbh = $self->dbh;
728 $self->insert_or_update(
729 insert
=> "INSERT INTO unreachable_fids (fid, lastupdate) VALUES (?, ".$self->unix_timestamp.")",
730 insert_vals
=> [ $fidid ],
731 update
=> "UPDATE unreachable_fids SET lastupdate = ".$self->unix_timestamp." WHERE field = ?",
732 update_vals
=> [ $fidid ],
738 my ($self, $fidid) = @_;
739 $self->dbh->do("DELETE FROM file WHERE fid=?", undef, $fidid);
741 $self->dbh->do("DELETE FROM tempfile WHERE fid=?", undef, $fidid);
743 $self->enqueue_for_delete2($fidid, 0);
747 sub replace_into_file
{
749 my %arg = $self->_valid_params([qw(fidid dmid key length classid)], @_);
750 $self->insert_or_update(
751 insert
=> "INSERT INTO file (fid, dmid, dkey, length, classid, devcount) VALUES (?, ?, ?, ?, ?, 0)",
752 insert_vals
=> [ @arg{'fidid', 'dmid', 'key', 'length', 'classid'} ],
753 update
=> "UPDATE file SET dmid=?, dkey=?, length=?, classid=?, devcount=0 WHERE fid=?",
754 update_vals
=> [ @arg{'dmid', 'key', 'length', 'classid', 'fidid'} ],
759 # given an array of MogileFS::DevFID objects, mass-insert them all
760 # into file_on (ignoring if they're already present)
761 sub mass_insert_file_on
{
762 my ($self, @devfids) = @_;
763 my @qmarks = map { "(?,?)" } @devfids;
764 my @binds = map { $_->fidid, $_->devid } @devfids;
766 my $sth = $self->dbh->prepare("INSERT INTO file_on (fid, devid) VALUES (?, ?)");
769 $sth->execute($_->fidid, $_->devid);
771 $self->condthrow unless $self->was_duplicate_error;
777 croak
("Called with empty lockname! $lockname") unless (defined $lockname && length($lockname) > 0);
778 my $num = unpack 'N',md5
($lockname);
779 return ($num & 0x7fffffff);
782 # attempt to grab a lock of lockname, and timeout after timeout seconds.
783 # the lock should be unique in the space of (lockid), as well the space of
785 # returns 1 on success and 0 on timeout
787 my ($self, $lockname, $timeout) = @_;
788 my $lockid = lockid
($lockname);
789 die "Lock recursion detected (grabbing $lockname ($lockid), had $self->{last_lock} (".lockid
($self->{last_lock
})."). Bailing out." if $self->{lock_depth
};
791 debug
("$$ Locking $lockname ($lockid)\n") if $Mgd::DEBUG
>= 5;
794 while($timeout > 0 and not defined($lock)) {
795 $lock = eval { $self->dbh->do('INSERT INTO lock (lockid,hostname,pid,acquiredat) VALUES (?, ?, ?, '.$self->unix_timestamp().')', undef, $lockid, hostname
, $$) };
796 if($self->was_duplicate_error) {
802 #$lock = $self->dbh->selectrow_array("SELECT pg_try_advisory_lock(?, ?)", undef, $lockid, $timeout);
803 #warn("$$ Lock result=$lock\n");
804 if (defined $lock and $lock == 1) {
805 $self->{lock_depth
} = 1;
806 $self->{last_lock
} = $lockname;
808 die "Something went horribly wrong while getting lock $lockname";
814 # attempt to release a lock of lockname.
815 # returns 1 on success and 0 if no lock we have has that name.
817 my ($self, $lockname) = @_;
818 my $lockid = lockid
($lockname);
819 debug
("$$ Unlocking $lockname ($lockid)\n") if $Mgd::DEBUG
>= 5;
820 #my $rv = $self->dbh->selectrow_array("SELECT pg_advisory_unlock(?)", undef, $lockid);
821 my $rv = $self->dbh->do('DELETE FROM lock WHERE lockid=? AND pid=? AND hostname=?', undef, $lockid, $$, hostname
);
822 debug
("Double-release of lock $lockname!") if $self->{lock_depth
} != 0 and $rv == 0 and $Mgd::DEBUG
>= 2;
824 $self->{lock_depth
} = 0;
834 MogileFS::Store::Postgres - PostgreSQL data storage for MogileFS