Postgres wasn't honoring a no-wait timeout
[MogileFS-Server.git] / lib / MogileFS / Store / Postgres.pm
blob6a44ee7effd3c751fa943af95f96edcaf48f0a3c
1 package MogileFS::Store::Postgres;
2 # vim: ts=4 sw=4 et ft=perl:
3 use strict;
4 use Digest::MD5 qw(md5); # Used for lockid
5 use DBI;
6 use DBD::Pg;
7 use Sys::Hostname;
8 use MogileFS::Util qw(throw debug error);
9 use MogileFS::Server;
10 use Carp;
11 use base 'MogileFS::Store';
13 # --------------------------------------------------------------------------
14 # Package methods we override
15 # --------------------------------------------------------------------------
17 sub dsn_of_dbhost {
18 my ($class, $dbname, $host, $port) = @_;
19 return "DBI:Pg:dbname=$dbname;host=$host" . ($port ? ";port=$port" : "");
22 sub dsn_of_root {
23 my ($class, $dbname, $host, $port) = @_;
24 return $class->dsn_of_dbhost('postgres', $host, $port);
27 # --------------------------------------------------------------------------
28 # Store-related things we override
29 # --------------------------------------------------------------------------
31 sub want_raise_errors { 1 }
33 # given a root DBI connection, create the named database. succeed
34 # if it it's made, or already exists. die otherwise.
35 sub create_db_if_not_exists {
36 my ($pkg, $rdbh, $dbname) = @_;
37 if(not $rdbh->do("CREATE DATABASE $dbname TEMPLATE template0 ENCODING 'UTF-8'" )) {
38 die "Failed to create database '$dbname': " . $rdbh->errstr . "\n" if ($rdbh->errstr !~ /already exists/);
42 sub grant_privileges {
43 my ($pkg, $rdbh, $dbname, $user, $pass) = @_;
44 eval {
45 $rdbh->do("CREATE ROLE $user LOGIN PASSWORD ?",
46 undef, $pass);
48 die "Failed to create user '$user': ". $rdbh->errstr . "\n"
49 if $rdbh->err && $rdbh->state != '42710';
50 # Owning the database is postgres is important
51 $rdbh->do("ALTER DATABASE $dbname OWNER TO $user")
52 or die "Failed to grant privileges " . $rdbh->errstr . "\n";
55 sub can_replace { 0 }
56 sub can_insertignore { 0 }
57 sub can_insert_multi { 0 }
58 sub unix_timestamp { "EXTRACT(epoch FROM NOW())::int4" }
60 sub init {
61 my $self = shift;
62 $self->SUPER::init;
63 my $database_version = $self->dbh->get_info(18); # SQL_DBMS_VER
64 # We need >=pg-8.2 because we use SAVEPOINT and ROLLBACK TO.
65 die "Postgres is too old! Must use >=postgresql-8.2!" if($database_version =~ /\A0[0-7]\.|08\.0[01]/);
66 $self->{lock_depth} = 0;
69 sub post_dbi_connect {
70 my $self = shift;
71 $self->SUPER::post_dbi_connect;
72 $self->{lock_depth} = 0;
75 sub can_do_slaves { 0 }
77 # TODO: Implement later
78 #sub check_slave {
81 sub was_deadlock_error {
82 my $self = shift;
83 my $dbh = $self->dbh;
84 return 0 unless $dbh->err;
85 return 1 if $dbh->state eq '40P01';
88 sub was_duplicate_error {
89 my $self = shift;
90 my $dbh = $self->dbh;
91 return 0 unless $dbh->err;
92 return 1 if $dbh->state eq '23505' || $dbh->errstr =~ /duplicate/i;
95 sub table_exists {
96 my ($self, $table) = @_;
97 return eval {
98 my $sth = $self->dbh->table_info(undef, undef, $table, "table");
99 my $rec = $sth->fetchrow_hashref;
100 return $rec ? 1 : 0;
104 sub setup_database {
105 my $self = shift;
106 $self->add_extra_tables('lock');
107 return $self->SUPER::setup_database;
110 sub filter_create_sql {
111 my ($self, $sql) = @_;
112 $sql =~ s/\bUNSIGNED\b//g;
113 $sql =~ s/\b(?:TINY|MEDIUM)INT\b/SMALLINT/g;
114 $sql =~ s/\bINT\s+NOT\s+NULL\s+AUTO_INCREMENT\b/SERIAL/g;
115 $sql =~ s/# /-- /g;
117 my ($table) = $sql =~ /create\s+table\s+(\S+)/i;
118 die "didn't find table" unless $table;
119 my $index = sprintf 'INDEXES_%s', $table;
120 if ($self->can($index)) {
121 $sql =~ s!,\s*INDEX\s*(\w+)?\s*\(.+?\)!!mgi;
124 # Allow 64-bit ids for file IDs
125 $sql =~ s!\bfid\s+INT\b!fid BIGINT!i if $self->fid_type eq "BIGINT";
127 return $sql;
130 sub TABLE_file {
131 "CREATE TABLE file (
132 fid INT NOT NULL,
133 PRIMARY KEY (fid),
135 dmid SMALLINT NOT NULL,
136 dkey VARCHAR(255), -- domain-defined
137 UNIQUE (dmid, dkey),
139 length BIGINT, -- big limit
140 CHECK (length >= 0),
142 classid SMALLINT NOT NULL,
143 devcount SMALLINT NOT NULL
147 sub INDEXES_file {
148 "CREATE INDEX file_devcount ON file (dmid,classid,devcount)"
151 sub INDEXES_unreachable_fids {
152 "CREATE INDEX unreachable_fids_lastupdate ON unreachable_fids (lastupdate)"
155 sub INDEXES_file_on {
156 "CREATE INDEX file_on_devid ON file_on (devid)"
159 sub TABLE_host {
160 "CREATE TABLE host (
161 hostid SMALLINT NOT NULL,
162 PRIMARY KEY (hostid),
163 CHECK (hostid >= 0),
165 status VARCHAR(8),
166 CHECK (status IN ('alive','dead','down')),
168 http_port INT DEFAULT 7500,
169 CHECK (http_port >= 0),
170 CHECK (http_port < 65536),
172 http_get_port INT,
173 CHECK (http_get_port >= 0),
174 CHECK (http_get_port < 65536),
176 hostname VARCHAR(40),
177 UNIQUE (hostname),
178 hostip VARCHAR(15),
179 UNIQUE (hostip),
180 altip VARCHAR(15),
181 UNIQUE (altip),
182 altmask VARCHAR(18)
186 sub TABLE_device {
187 "CREATE TABLE device (
188 devid SMALLINT NOT NULL,
189 PRIMARY KEY (devid),
190 CHECK (devid >= 0),
192 hostid SMALLINT NOT NULL,
194 status VARCHAR(8),
195 CHECK (status IN ('alive','dead','down','readonly','drain')),
196 weight INT DEFAULT 100,
198 mb_total INT,
199 CHECK (mb_total >= 0),
200 mb_used INT,
201 CHECK (mb_used >= 0),
202 mb_asof INT
203 CHECK (mb_asof >= 0)
207 sub INDEXES_device {
208 "CREATE INDEX device_status ON device (status)"
211 sub INDEXES_file_to_replicate {
212 "CREATE INDEX file_to_replicate_nexttry ON file_to_replicate (nexttry)"
215 sub INDEXES_file_to_delete2 {
216 "CREATE INDEX file_to_delete2_nexttry ON file_to_delete2 (nexttry)"
219 sub INDEXES_file_to_delete_later {
220 "CREATE INDEX file_to_delete_later_delafter ON file_to_delete_later (delafter)"
223 sub INDEXES_fsck_log {
224 "CREATE INDEX fsck_log_utime ON fsck_log (utime)"
227 sub INDEXES_file_to_queue {
228 "CREATE INDEX type_nexttry ON file_to_queue (type,nexttry)"
231 # Extra table
232 sub TABLE_lock {
233 "CREATE TABLE lock (
234 lockid INT NOT NULL,
235 PRIMARY KEY (lockid),
236 CHECK (lockid >= 0),
238 hostname VARCHAR(255) NOT NULL,
240 pid INT NOT NULL,
241 CHECK (pid >= 0),
243 acquiredat INT NOT NULL,
244 CHECK (acquiredat >= 0)
248 sub upgrade_add_host_getport {
249 my $self = shift;
250 # see if they have the get port, else update it
251 unless ($self->column_type("host", "http_get_port")) {
252 $self->dowell("ALTER TABLE host ADD COLUMN http_get_port INT CHECK(http_get_port >= 0)");
256 sub upgrade_add_host_altip {
257 my $self = shift;
258 unless ($self->column_type("host", "altip")) {
259 $self->dowell("ALTER TABLE host ADD COLUMN altip VARCHAR(15)");
260 $self->dowell("ALTER TABLE host ADD COLUMN altmask VARCHAR(18)");
261 $self->dowell("ALTER TABLE host ADD UNIQUE altip (altip)");
265 sub upgrade_add_device_asof {
266 my $self = shift;
267 unless ($self->column_type("device", "mb_asof")) {
268 $self->dowell("ALTER TABLE device ADD COLUMN mb_asof INT CHECK(mb_asof >= 0)");
272 sub upgrade_add_device_weight {
273 my $self = shift;
274 unless ($self->column_type("device", "weight")) {
275 $self->dowell("ALTER TABLE device ADD COLUMN weight INT DEFAULT 100");
279 sub upgrade_add_device_readonly {
280 my $self = shift;
281 unless ($self->column_constraint("device", "status") =~ /readonly/) {
282 $self->dowell("ALTER TABLE device MODIFY COLUMN status VARCHAR(8) CHECK(status IN ('alive', 'dead', 'down', 'readonly'))");
286 sub upgrade_add_device_drain {
287 my $self = shift;
288 unless ($self->column_constraint("device", "status") =~ /drain/) {
289 $self->dowell("ALTER TABLE device MODIFY COLUMN status VARCHAR(8) CHECK(status IN ('alive', 'dead', 'down', 'readonly','drain'))");
293 sub upgrade_modify_server_settings_value {
294 my $self = shift;
295 unless ($self->column_type("server_settings", "value" =~ /text/i)) {
296 $self->dowell("ALTER TABLE server_settings ALTER COLUMN value TYPE TEXT");
300 sub upgrade_add_file_to_queue_arg {
301 my $self = shift;
302 unless ($self->column_type("file_to_queue", "arg")) {
303 $self->dowell("ALTER TABLE file_to_queue ADD COLUMN arg TEXT");
307 # Postgres doesn't have or never used a MEDIUMINT for device.
308 sub upgrade_modify_device_size {
309 return 1;
312 # return 1 on success. die otherwise.
313 sub enqueue_fids_to_delete {
314 # My kingdom for a real INSERT IGNORE implementation!
315 my ($self, @fidids) = @_;
316 my $sql = "INSERT INTO file_to_delete (fid) VALUES (?)";
318 foreach my $fidid (@fidids) {
319 $self->dbh->begin_work;
320 $self->condthrow;
321 eval {
322 $self->dbh->do($sql, undef, $fidid);
324 if ($@ || $self->dbh->err) {
325 if ($self->was_duplicate_error) {
326 # Do nothing
327 } else {
328 $self->condthrow;
331 $self->dbh->commit;
336 sub enqueue_fids_to_delete2 {
337 # My kingdom for a real REPLACE implementation!
338 my ($self, @fidids) = @_;
339 my $tbl = 'file_to_delete2';
340 my $sql1 = sprintf "INSERT INTO %s (fid, nexttry) VALUES (?,%s)", $tbl, $self->unix_timestamp;
341 my @dup_fids;
343 foreach my $fidid (@fidids) {
344 $self->dbh->begin_work;
345 $self->condthrow;
346 eval {
347 $self->dbh->do($sql1, undef, $fidid);
349 if ($@ || $self->dbh->err) {
350 if ($self->was_duplicate_error) {
351 push @dup_fids, $fidid;
352 } else {
353 $self->condthrow;
356 $self->dbh->commit;
359 my $sql2 = sprintf 'UPDATE %s SET nexttry = %s WHERE fid IN (?)', $tbl, $self->unix_timestamp;
361 foreach my $fidid (@dup_fids) {
362 $self->dbh->begin_work;
363 $self->condthrow;
364 eval {
365 $self->dbh->do($sql2, undef, $fidid);
367 if ($@ || $self->dbh->err) {
368 if ($self->was_duplicate_error) {
369 # Ignore, no need of it
370 } else {
371 $self->condthrow;
374 $self->dbh->commit;
379 # --------------------------------------------------------------------------
380 # Functions specific to Store::Postgres subclass. Not in parent.
381 # --------------------------------------------------------------------------
383 sub insert_or_ignore {
384 my $self = shift;
385 my %arg = $self->_valid_params([qw(insert insert_vals)], @_);
386 return $self->insert_or_update(
387 insert => $arg{insert},
388 insert_vals => $arg{insert_vals},
389 update => 'IGNORE',
390 update_vals => 'IGNORE',
394 sub insert_or_update {
395 my $self = shift;
396 my %arg = $self->_valid_params([qw(insert update insert_vals update_vals)], @_);
397 my $dbh = $self->dbh;
398 my $savepoint_name = $arg{insert};
399 $savepoint_name =~ s/^INSERT INTO ([^\s]+).*$/$1/g;
401 $dbh->begin_work;
402 $dbh->do('SAVEPOINT '.$savepoint_name);
403 eval {
404 $dbh->do($arg{insert}, undef, @{ $arg{insert_vals} });
406 if ($@ || $dbh->err) {
407 if ($self->was_duplicate_error) {
408 $dbh->do('ROLLBACK TO '.$savepoint_name);
409 if($arg{update} ne "IGNORE") {
410 $dbh->do($arg{update}, undef, @{ $arg{update_vals} });
413 $self->condthrow;
416 $dbh->commit;
417 return 1;
420 sub column_type {
421 my ($self, $table, $col) = @_;
422 my $sth = $self->dbh->prepare("SELECT column_name,data_type FROM information_schema.columns WHERE table_name=? AND column_name=?");
423 $sth->execute($table,$col);
424 while (my $rec = $sth->fetchrow_hashref) {
425 if ($rec->{column_name} eq $col) {
426 $sth->finish;
427 return $rec->{data_type};
430 return undef;
433 sub column_constraint {
434 my ($self, $table, $col) = @_;
435 my $sth = $self->dbh->prepare("SELECT column_name,information_schema.check_constraints.check_clause FROM information_schema.constraint_column_usage JOIN information_schema.check_constraints USING(constraint_catalog,constraint_schema,constraint_name) WHERE table_name=? AND column_name=?");
436 $sth->execute($table,$col);
437 while (my $rec = $sth->fetchrow_hashref) {
438 if ($rec->{column_name} eq $col) {
439 $sth->finish;
440 return $rec->{check_clause};
443 return undef;
446 sub fid_type {
447 my $self = shift;
448 return $self->{_fid_type} if $self->{_fid_type};
450 # let people force bigint mode with environment.
451 if ($ENV{MOG_FIDSIZE} && $ENV{MOG_FIDSIZE} eq "big") {
452 return $self->{_fid_type} = "BIGINT";
455 # else, check a maybe-existing table and see if we're in bigint
456 # mode already.
457 my $dbh = $self->dbh;
458 my $file_fid_type = $self->column_type("file", "fid");
459 if($file_fid_type) {
460 if ($file_fid_type =~ /bigint/i) {
461 return $self->{_fid_type} = "BIGINT";
462 } elsif($file_fid_type =~ /int/i) {
463 # Old installs might not have raised the fid type size yet.
464 return $self->{_fid_type} = "INT";
468 # Used to default to 32bit ints, but this always bites people
469 # a few years down the road. So default to 64bit.
470 return $self->{_fid_type} = "BIGINT";
473 # --------------------------------------------------------------------------
474 # Test suite things we override
475 # --------------------------------------------------------------------------
477 sub new_temp {
478 my $self = shift;
479 my %args = @_;
480 my $dbname = $args{dbname} || "tmp_mogiletest";
481 my $host = $args{dbhost} || 'localhost';
482 my $port = $args{dbport} || 5432;
483 my $user = $args{dbuser} || 'mogile';
484 my $pass = $args{dbpass} || '';
485 my $rootuser = $args{dbrootuser} || $args{dbuser} || 'postgres';
486 my $rootpass = $args{dbrootpass} || $args{dbpass} || '';
487 _drop_db($dbname,$host,$port,$rootuser,$rootpass);
489 my @args = ( "$FindBin::Bin/../mogdbsetup", "--yes",
490 "--dbname=$dbname", "--type=Postgres",
491 "--dbhost=$host", "--dbport=$port",
492 "--dbuser=$user",
493 "--dbrootuser=$rootuser", );
494 push @args, "--dbpass=$pass" unless $pass eq '';
495 push @args, "--dbrootpass=$rootpass" unless $rootpass eq '';
496 system(@args)
497 and die "Failed to run mogdbsetup (".join(' ',map { "'".$_."'" } @args).").";
499 return MogileFS::Store->new_from_dsn_user_pass("dbi:Pg:dbname=$dbname;host=$host;port=$port",
500 $user,
501 $pass);
504 my $rootdbh;
505 sub _root_dbh {
506 my $host = shift;
507 my $port = shift;
508 my $rootuser = shift;
509 my $rootpass = shift;
510 return $rootdbh ||= DBI->connect("DBI:Pg:dbname=postgres;host=$host;port=$port", $rootuser, $rootpass, { RaiseError => 1 })
511 or die "Couldn't connect to local PostgreSQL database as $rootuser";
514 sub _drop_db {
515 my $dbname = shift;
516 my $host = shift;
517 my $port = shift;
518 my $rootuser = shift;
519 my $rootpass = shift;
520 my $root_dbh = _root_dbh($host, $port, $rootuser, $rootpass);
521 eval {
522 $root_dbh->do("DROP DATABASE $dbname");
527 # --------------------------------------------------------------------------
528 # Data-access things we override
529 # --------------------------------------------------------------------------
531 # return new classid on success (non-zero integer), die on failure
532 # throw 'dup' on duplicate name
533 # TODO: add locks around entire table
534 sub create_class {
535 my ($self, $dmid, $classname) = @_;
536 my $dbh = $self->dbh;
538 # get the max class id in this domain
539 my $maxid = $dbh->selectrow_array
540 ('SELECT MAX(classid) FROM class WHERE dmid = ?', undef, $dmid) || 0;
542 # now insert the new class
543 my $rv = eval {
544 $dbh->do("INSERT INTO class (dmid, classid, classname, mindevcount) VALUES (?, ?, ?, ?)",
545 undef, $dmid, $maxid + 1, $classname, 2);
547 if ($@ || $dbh->err) {
548 # first is error code for duplicates
549 if ($self->was_duplicate_error) {
550 throw("dup");
553 return $maxid + 1 if $rv;
554 $self->condthrow;
555 die;
558 # returns 1 on success, 0 on duplicate key error, dies on exception
559 # TODO: need a test to hit the duplicate name error condition
560 sub rename_file {
561 my ($self, $fidid, $to_key) = @_;
562 my $dbh = $self->dbh;
563 eval {
564 $dbh->do('UPDATE file SET dkey = ? WHERE fid=?',
565 undef, $to_key, $fidid);
567 if ($@ || $dbh->err) {
568 # first is error code for duplicates
569 if ($self->was_duplicate_error) {
570 return 0;
571 } else {
572 die $@;
575 $self->condthrow;
576 return 1;
579 # add a record of fidid existing on devid
580 # returns 1 on success, 0 on duplicate
581 sub add_fidid_to_devid {
582 my ($self, $fidid, $devid) = @_;
583 my $dbh = $self->dbh;
584 eval {
585 $dbh->do("INSERT INTO file_on (fid, devid) VALUES (?, ?)", undef, $fidid, $devid);
588 return 1 if !$@ && !$dbh->err;
589 return 0;
592 # update the device count for a given fidid
593 sub update_devcount_atomic {
594 my ($self, $fidid) = @_;
595 my $rv;
597 $self->dbh->begin_work;
598 $rv = $self->dbh->do("SELECT devcount FROM file WHERE fid=? FOR UPDATE", undef, $fidid);
599 $self->condthrow;
600 if($rv == 0) {
601 $self->dbh->rollback;
602 return 1;
604 $rv = $self->dbh->do("UPDATE file SET devcount=(SELECT COUNT(devid) FROM file_on WHERE fid=?) WHERE fid=?", undef, $fidid, $fidid);
605 $self->condthrow;
606 $self->dbh->commit;
607 $self->condthrow;
608 return $rv;
611 sub should_begin_replicating_fidid {
612 my ($self, $fidid) = @_;
613 my $lockname = "mgfs:fid:$fidid:replicate";
614 return 1 if $self->get_lock($lockname, 1);
615 return 0;
618 sub note_done_replicating {
619 my ($self, $fidid) = @_;
620 my $lockname = "mgfs:fid:$fidid:replicate";
621 $self->release_lock($lockname);
624 # enqueue a fidid for replication, from a specific deviceid (can be undef), in a given number of seconds.
625 sub enqueue_for_replication {
626 my ($self, $fidid, $from_devid, $in) = @_;
627 my $dbh = $self->dbh;
629 my $nexttry = 0;
630 if ($in) {
631 $nexttry = $self->unix_timestamp." + ${in}::int";
634 eval {
635 $dbh->do("INSERT INTO file_to_replicate (fid, fromdevid, nexttry) VALUES (?, ?, $nexttry)",
636 undef, $fidid, $from_devid);
640 # reschedule all deferred replication, return number rescheduled
641 sub replicate_now {
642 my ($self) = @_;
643 return $self->dbh->do("UPDATE file_to_replicate SET nexttry = ".$self->unix_timestamp." WHERE nexttry > ".$self->unix_timestamp);
646 sub reschedule_file_to_replicate_relative {
647 my ($self, $fid, $in_n_secs) = @_;
648 $self->dbh->do("UPDATE file_to_replicate SET nexttry = ".$self->unix_timestamp." + ?, failcount = failcount + 1 WHERE fid = ?",
649 undef, $in_n_secs, $fid);
652 # creates a new domain, given a domain namespace string. return the dmid on success,
653 # throw 'dup' on duplicate name.
654 sub create_domain {
655 my ($self, $name) = @_;
656 my $dbh = $self->dbh;
658 # get the max domain id
659 my $maxid = $dbh->selectrow_array('SELECT MAX(dmid) FROM domain') || 0;
660 my $rv = eval {
661 $dbh->do('INSERT INTO domain (dmid, namespace) VALUES (?, ?)',
662 undef, $maxid + 1, $name);
664 if ($self->was_duplicate_error) {
665 throw("dup");
667 return $maxid+1 if $rv;
668 die "failed to make domain"; # FIXME: the above is racy.
671 sub set_server_setting {
672 my ($self, $key, $val) = @_;
673 my $dbh = $self->dbh;
675 if (defined $val) {
676 $self->insert_or_update(
677 insert => "INSERT INTO server_settings (field, value) VALUES (?, ?)",
678 insert_vals => [ $key, $val ],
679 update => "UPDATE server_settings SET value = ? WHERE field = ?",
680 update_vals => [ $val, $key ],
682 } else {
683 $dbh->do("DELETE FROM server_settings WHERE field=?", undef, $key);
686 die "Error updating 'server_settings': " . $dbh->errstr if $dbh->err;
687 return 1;
690 # This implementation is race-safe
691 sub incr_server_setting {
692 my ($self, $key, $val) = @_;
693 $val = 1 unless defined $val;
694 return unless $val;
696 $self->dbh->begin_work;
697 my $value = $self->dbh->selectrow_array("SELECT value FROM server_settings WHERE field=? FOR UPDATE",undef,$key);
698 if($value) {
699 if($value =~ /^\d+$/) {
700 $value += $val;
701 } else {
702 warning("Wanted to incr_server_setting by $val on field=$key but old value was $value. Setting instead.");
703 $value = $val;
705 my $rv = $self->dbh->do("UPDATE server_settings ".
706 "SET value=? ".
707 "WHERE field=?", undef,
708 $value, $key) > 0;
709 $self->dbh->commit;
710 return 1 if $rv;
712 $self->dbh->rollback; # Release the row-lock
713 $self->set_server_setting($key, $val);
716 # return 1 on success, throw "dup" on duplicate devid or throws other error on failure
717 sub create_device {
718 my ($self, $devid, $hostid, $status) = @_;
719 my $rv = $self->conddup(sub {
720 $self->dbh->do("INSERT INTO device (devid, hostid, status) VALUES (?, ?, ?)", undef,
721 $devid, $hostid, $status);
723 $self->condthrow;
724 die "error making device $devid\n" unless $rv > 0;
725 return 1;
728 sub mark_fidid_unreachable {
729 my ($self, $fidid) = @_;
730 my $dbh = $self->dbh;
732 eval {
733 $self->insert_or_update(
734 insert => "INSERT INTO unreachable_fids (fid, lastupdate) VALUES (?, ".$self->unix_timestamp.")",
735 insert_vals => [ $fidid ],
736 update => "UPDATE unreachable_fids SET lastupdate = ".$self->unix_timestamp." WHERE field = ?",
737 update_vals => [ $fidid ],
742 sub delete_fidid {
743 my ($self, $fidid) = @_;
744 $self->dbh->do("DELETE FROM file WHERE fid=?", undef, $fidid);
745 $self->condthrow;
746 $self->dbh->do("DELETE FROM tempfile WHERE fid=?", undef, $fidid);
747 $self->condthrow;
748 $self->enqueue_for_delete2($fidid, 0);
749 $self->condthrow;
752 sub replace_into_file {
753 my $self = shift;
754 my %arg = $self->_valid_params([qw(fidid dmid key length classid)], @_);
755 $self->insert_or_update(
756 insert => "INSERT INTO file (fid, dmid, dkey, length, classid, devcount) VALUES (?, ?, ?, ?, ?, 0)",
757 insert_vals => [ @arg{'fidid', 'dmid', 'key', 'length', 'classid'} ],
758 update => "UPDATE file SET dmid=?, dkey=?, length=?, classid=?, devcount=0 WHERE fid=?",
759 update_vals => [ @arg{'dmid', 'key', 'length', 'classid', 'fidid'} ],
761 $self->condthrow;
764 # given an array of MogileFS::DevFID objects, mass-insert them all
765 # into file_on (ignoring if they're already present)
766 sub mass_insert_file_on {
767 my ($self, @devfids) = @_;
768 my @qmarks = map { "(?,?)" } @devfids;
769 my @binds = map { $_->fidid, $_->devid } @devfids;
771 my $sth = $self->dbh->prepare("INSERT INTO file_on (fid, devid) VALUES (?, ?)");
772 foreach (@devfids) {
773 eval {
774 $sth->execute($_->fidid, $_->devid);
776 $self->condthrow unless $self->was_duplicate_error;
778 return 1;
780 sub lockid {
781 my ($lockname) = @_;
782 croak("Called with empty lockname! $lockname") unless (defined $lockname && length($lockname) > 0);
783 my $num = unpack 'N',md5($lockname);
784 return ($num & 0x7fffffff);
787 # attempt to grab a lock of lockname, and timeout after timeout seconds.
788 # the lock should be unique in the space of (lockid), as well the space of
789 # (hostname,pid).
790 # returns 1 on success and 0 on timeout
791 sub get_lock {
792 my ($self, $lockname, $timeout) = @_;
793 my $lockid = lockid($lockname);
794 die "Lock recursion detected (grabbing $lockname ($lockid), had $self->{last_lock} (".lockid($self->{last_lock})."). Bailing out." if $self->{lock_depth};
796 debug("$$ Locking $lockname ($lockid)\n") if $Mgd::DEBUG >= 5;
798 my $lock = undef;
799 while($timeout >= 0 and not defined($lock)) {
800 $lock = eval { $self->dbh->do('INSERT INTO lock (lockid,hostname,pid,acquiredat) VALUES (?, ?, ?, '.$self->unix_timestamp().')', undef, $lockid, hostname, $$) };
801 if($self->was_duplicate_error) {
802 $timeout--;
803 sleep 1 $timeout > 0;
804 next;
806 $self->condthrow;
807 #$lock = $self->dbh->selectrow_array("SELECT pg_try_advisory_lock(?, ?)", undef, $lockid, $timeout);
808 #warn("$$ Lock result=$lock\n");
809 if (defined $lock and $lock == 1) {
810 $self->{lock_depth} = 1;
811 $self->{last_lock} = $lockname;
812 } else {
813 die "Something went horribly wrong while getting lock $lockname";
816 return $lock;
819 # attempt to release a lock of lockname.
820 # returns 1 on success and 0 if no lock we have has that name.
821 sub release_lock {
822 my ($self, $lockname) = @_;
823 my $lockid = lockid($lockname);
824 debug("$$ Unlocking $lockname ($lockid)\n") if $Mgd::DEBUG >= 5;
825 #my $rv = $self->dbh->selectrow_array("SELECT pg_advisory_unlock(?)", undef, $lockid);
826 my $rv = $self->dbh->do('DELETE FROM lock WHERE lockid=? AND pid=? AND hostname=?', undef, $lockid, $$, hostname);
827 debug("Double-release of lock $lockname!") if $self->{lock_depth} != 0 and $rv == 0 and $Mgd::DEBUG >= 2;
828 $self->condthrow;
829 $self->{lock_depth} = 0;
830 return $rv;
835 __END__
837 =head1 NAME
839 MogileFS::Store::Postgres - PostgreSQL data storage for MogileFS
841 =head1 SEE ALSO
843 L<MogileFS::Store>