allow passing dbhost/dbport to tests under Pg
[MogileFS-Server.git] / lib / MogileFS / Store / Postgres.pm
blob635b55ec772c885e75d3a91a642470f55c4a5729
1 package MogileFS::Store::Postgres;
2 # vim: ts=4 sw=4 et ft=perl:
3 use strict;
4 use Digest::MD5 qw(md5); # Used for lockid
5 use DBI;
6 use DBD::Pg;
7 use Sys::Hostname;
8 use MogileFS::Util qw(throw debug error);
9 use MogileFS::Server;
10 use Carp;
11 use base 'MogileFS::Store';
13 # --------------------------------------------------------------------------
14 # Package methods we override
15 # --------------------------------------------------------------------------
17 sub dsn_of_dbhost {
18 my ($class, $dbname, $host, $port) = @_;
19 return "DBI:Pg:dbname=$dbname;host=$host" . ($port ? ";port=$port" : "");
22 sub dsn_of_root {
23 my ($class, $dbname, $host, $port) = @_;
24 return $class->dsn_of_dbhost('postgres', $host, $port);
27 # --------------------------------------------------------------------------
28 # Store-related things we override
29 # --------------------------------------------------------------------------
31 sub want_raise_errors { 1 }
33 # given a root DBI connection, create the named database. succeed
34 # if it it's made, or already exists. die otherwise.
35 sub create_db_if_not_exists {
36 my ($pkg, $rdbh, $dbname) = @_;
37 if(not $rdbh->do("CREATE DATABASE $dbname TEMPLATE template0 ENCODING 'UTF-8'" )) {
38 die "Failed to create database '$dbname': " . $rdbh->errstr . "\n" if ($rdbh->errstr !~ /already exists/);
42 sub grant_privileges {
43 my ($pkg, $rdbh, $dbname, $user, $pass) = @_;
44 eval {
45 $rdbh->do("CREATE ROLE $user LOGIN PASSWORD ?",
46 undef, $pass);
48 die "Failed to create user '$user': ". $rdbh->errstr . "\n"
49 if $rdbh->err && $rdbh->state != '42710';
50 # Owning the database is postgres is important
51 $rdbh->do("ALTER DATABASE $dbname OWNER TO $user")
52 or die "Failed to grant privileges " . $rdbh->errstr . "\n";
55 sub can_replace { 0 }
56 sub can_insertignore { 0 }
57 sub can_insert_multi { 0 }
58 sub unix_timestamp { "EXTRACT(epoch FROM NOW())::int4" }
60 sub init {
61 my $self = shift;
62 $self->SUPER::init;
63 my $database_version = $self->dbh->get_info(18); # SQL_DBMS_VER
64 # We need >=pg-8.2 because we use SAVEPOINT and ROLLBACK TO.
65 die "Postgres is too old! Must use >=postgresql-8.2!" if($database_version =~ /\A0[0-7]\.|08\.0[01]/);
66 $self->{lock_depth} = 0;
69 sub post_dbi_connect {
70 my $self = shift;
71 $self->SUPER::post_dbi_connect;
72 $self->{lock_depth} = 0;
75 sub can_do_slaves { 0 }
77 # TODO: Implement later
78 #sub check_slave {
81 sub was_deadlock_error {
82 my $self = shift;
83 my $dbh = $self->dbh;
84 return 0 unless $dbh->err;
85 return 1 if $dbh->state eq '40P01';
88 sub was_duplicate_error {
89 my $self = shift;
90 my $dbh = $self->dbh;
91 return 0 unless $dbh->err;
92 return 1 if $dbh->state eq '23505' || $dbh->errstr =~ /duplicate/i;
95 sub table_exists {
96 my ($self, $table) = @_;
97 return eval {
98 my $sth = $self->dbh->table_info(undef, undef, $table, "table");
99 my $rec = $sth->fetchrow_hashref;
100 return $rec ? 1 : 0;
104 sub setup_database {
105 my $self = shift;
106 $self->add_extra_tables('lock');
107 return $self->SUPER::setup_database;
110 sub filter_create_sql {
111 my ($self, $sql) = @_;
112 $sql =~ s/\bUNSIGNED\b//g;
113 $sql =~ s/\b(?:TINY|MEDIUM)INT\b/SMALLINT/g;
114 $sql =~ s/\bINT\s+NOT\s+NULL\s+AUTO_INCREMENT\b/SERIAL/g;
115 $sql =~ s/# /-- /g;
117 my ($table) = $sql =~ /create\s+table\s+(\S+)/i;
118 die "didn't find table" unless $table;
119 my $index = sprintf 'INDEXES_%s', $table;
120 if ($self->can($index)) {
121 $sql =~ s!,\s*INDEX\s*(\w+)?\s*\(.+?\)!!mgi;
124 # Allow 64-bit ids for file IDs
125 $sql =~ s!\bfid\s+INT\b!fid BIGINT!i if $self->fid_type eq "BIGINT";
127 return $sql;
130 sub TABLE_file {
131 "CREATE TABLE file (
132 fid INT NOT NULL,
133 PRIMARY KEY (fid),
135 dmid SMALLINT NOT NULL,
136 dkey VARCHAR(255), -- domain-defined
137 UNIQUE (dmid, dkey),
139 length BIGINT, -- big limit
140 CHECK (length >= 0),
142 classid SMALLINT NOT NULL,
143 devcount SMALLINT NOT NULL
147 sub INDEXES_file {
148 "CREATE INDEX file_devcount ON file (dmid,classid,devcount)"
151 sub INDEXES_unreachable_fids {
152 "CREATE INDEX unreachable_fids_lastupdate ON unreachable_fids (lastupdate)"
155 sub INDEXES_file_on {
156 "CREATE INDEX file_on_devid ON file_on (devid)"
159 sub TABLE_host {
160 "CREATE TABLE host (
161 hostid SMALLINT NOT NULL,
162 PRIMARY KEY (hostid),
163 CHECK (hostid >= 0),
165 status VARCHAR(8),
166 CHECK (status IN ('alive','dead','down')),
168 http_port INT DEFAULT 7500,
169 CHECK (http_port >= 0),
170 CHECK (http_port < 65536),
172 http_get_port INT,
173 CHECK (http_get_port >= 0),
174 CHECK (http_get_port < 65536),
176 hostname VARCHAR(40),
177 UNIQUE (hostname),
178 hostip VARCHAR(15),
179 UNIQUE (hostip),
180 altip VARCHAR(15),
181 UNIQUE (altip),
182 altmask VARCHAR(18)
186 sub TABLE_device {
187 "CREATE TABLE device (
188 devid SMALLINT NOT NULL,
189 PRIMARY KEY (devid),
190 CHECK (devid >= 0),
192 hostid SMALLINT NOT NULL,
194 status VARCHAR(8),
195 CHECK (status IN ('alive','dead','down','readonly','drain')),
196 weight INT DEFAULT 100,
198 mb_total INT,
199 CHECK (mb_total >= 0),
200 mb_used INT,
201 CHECK (mb_used >= 0),
202 mb_asof INT
203 CHECK (mb_asof >= 0)
207 sub INDEXES_device {
208 "CREATE INDEX device_status ON device (status)"
211 sub INDEXES_file_to_replicate {
212 "CREATE INDEX file_to_replicate_nexttry ON file_to_replicate (nexttry)"
215 sub INDEXES_file_to_delete2 {
216 "CREATE INDEX file_to_delete2_nexttry ON file_to_delete2 (nexttry)"
219 sub INDEXES_file_to_delete_later {
220 "CREATE INDEX file_to_delete_later_delafter ON file_to_delete_later (delafter)"
223 sub INDEXES_fsck_log {
224 "CREATE INDEX fsck_log_utime ON fsck_log (utime)"
227 sub INDEXES_file_to_queue {
228 "CREATE INDEX type_nexttry ON file_to_queue (type,nexttry)"
231 # Extra table
232 sub TABLE_lock {
233 "CREATE TABLE lock (
234 lockid INT NOT NULL,
235 PRIMARY KEY (lockid),
236 CHECK (lockid >= 0),
238 hostname VARCHAR(255) NOT NULL,
240 pid INT NOT NULL,
241 CHECK (pid >= 0),
243 acquiredat INT NOT NULL,
244 CHECK (acquiredat >= 0)
248 sub upgrade_add_host_getport {
249 my $self = shift;
250 # see if they have the get port, else update it
251 unless ($self->column_type("host", "http_get_port")) {
252 $self->dowell("ALTER TABLE host ADD COLUMN http_get_port INT CHECK(http_get_port >= 0)");
256 sub upgrade_add_host_altip {
257 my $self = shift;
258 unless ($self->column_type("host", "altip")) {
259 $self->dowell("ALTER TABLE host ADD COLUMN altip VARCHAR(15)");
260 $self->dowell("ALTER TABLE host ADD COLUMN altmask VARCHAR(18)");
261 $self->dowell("ALTER TABLE host ADD UNIQUE altip (altip)");
265 sub upgrade_add_device_asof {
266 my $self = shift;
267 unless ($self->column_type("device", "mb_asof")) {
268 $self->dowell("ALTER TABLE device ADD COLUMN mb_asof INT CHECK(mb_asof >= 0)");
272 sub upgrade_add_device_weight {
273 my $self = shift;
274 unless ($self->column_type("device", "weight")) {
275 $self->dowell("ALTER TABLE device ADD COLUMN weight INT DEFAULT 100");
279 sub upgrade_add_device_readonly {
280 my $self = shift;
281 unless ($self->column_constraint("device", "status") =~ /readonly/) {
282 $self->dowell("ALTER TABLE device MODIFY COLUMN status VARCHAR(8) CHECK(status IN ('alive', 'dead', 'down', 'readonly'))");
286 sub upgrade_add_device_drain {
287 my $self = shift;
288 unless ($self->column_constraint("device", "status") =~ /drain/) {
289 $self->dowell("ALTER TABLE device MODIFY COLUMN status VARCHAR(8) CHECK(status IN ('alive', 'dead', 'down', 'readonly','drain'))");
293 sub upgrade_modify_server_settings_value {
294 my $self = shift;
295 unless ($self->column_type("server_settings", "value" =~ /text/i)) {
296 $self->dowell("ALTER TABLE server_settings ALTER COLUMN value TYPE TEXT");
300 sub upgrade_add_file_to_queue_arg {
301 my $self = shift;
302 unless ($self->column_type("file_to_queue", "arg")) {
303 $self->dowell("ALTER TABLE file_to_queue ADD COLUMN arg TEXT");
307 # return 1 on success. die otherwise.
308 sub enqueue_fids_to_delete {
309 # My kingdom for a real INSERT IGNORE implementation!
310 my ($self, @fidids) = @_;
311 my $sql = "INSERT INTO file_to_delete (fid) VALUES (?)";
313 foreach my $fidid (@fidids) {
314 $self->dbh->begin_work;
315 $self->condthrow;
316 eval {
317 $self->dbh->do($sql, undef, $fidid);
319 if ($@ || $self->dbh->err) {
320 if ($self->was_duplicate_error) {
321 # Do nothing
322 } else {
323 $self->condthrow;
326 $self->dbh->commit;
331 sub enqueue_fids_to_delete2 {
332 # My kingdom for a real REPLACE implementation!
333 my ($self, @fidids) = @_;
334 my $tbl = 'file_to_delete2';
335 my $sql1 = sprintf "INSERT INTO %s (fid, nexttry) VALUES (?,%s)", $tbl, $self->unix_timestamp;
336 my @dup_fids;
338 foreach my $fidid (@fidids) {
339 $self->dbh->begin_work;
340 $self->condthrow;
341 eval {
342 $self->dbh->do($sql1, undef, $fidid);
344 if ($@ || $self->dbh->err) {
345 if ($self->was_duplicate_error) {
346 push @dup_fids, $fidid;
347 } else {
348 $self->condthrow;
351 $self->dbh->commit;
354 my $sql2 = sprintf 'UPDATE %s SET nexttry = %s WHERE fid IN (?)', $tbl, $self->unix_timestamp;
356 foreach my $fidid (@dup_fids) {
357 $self->dbh->begin_work;
358 $self->condthrow;
359 eval {
360 $self->dbh->do($sql2, undef, $fidid);
362 if ($@ || $self->dbh->err) {
363 if ($self->was_duplicate_error) {
364 # Ignore, no need of it
365 } else {
366 $self->condthrow;
369 $self->dbh->commit;
374 # --------------------------------------------------------------------------
375 # Functions specific to Store::Postgres subclass. Not in parent.
376 # --------------------------------------------------------------------------
378 sub insert_or_ignore {
379 my $self = shift;
380 my %arg = $self->_valid_params([qw(insert insert_vals)], @_);
381 return $self->insert_or_update(
382 insert => $arg{insert},
383 insert_vals => $arg{insert_vals},
384 update => 'IGNORE',
385 update_vals => 'IGNORE',
389 sub insert_or_update {
390 my $self = shift;
391 my %arg = $self->_valid_params([qw(insert update insert_vals update_vals)], @_);
392 my $dbh = $self->dbh;
393 my $savepoint_name = $arg{insert};
394 $savepoint_name =~ s/^INSERT INTO ([^\s]+).*$/$1/g;
396 $dbh->begin_work;
397 $dbh->do('SAVEPOINT '.$savepoint_name);
398 eval {
399 $dbh->do($arg{insert}, undef, @{ $arg{insert_vals} });
401 if ($@ || $dbh->err) {
402 if ($self->was_duplicate_error) {
403 $dbh->do('ROLLBACK TO '.$savepoint_name);
404 if($arg{update} ne "IGNORE") {
405 $dbh->do($arg{update}, undef, @{ $arg{update_vals} });
408 $self->condthrow;
411 $dbh->commit;
412 return 1;
415 sub column_type {
416 my ($self, $table, $col) = @_;
417 my $sth = $self->dbh->prepare("SELECT column_name,data_type FROM information_schema.columns WHERE table_name=? AND column_name=?");
418 $sth->execute($table,$col);
419 while (my $rec = $sth->fetchrow_hashref) {
420 if ($rec->{column_name} eq $col) {
421 $sth->finish;
422 return $rec->{data_type};
425 return undef;
428 sub column_constraint {
429 my ($self, $table, $col) = @_;
430 my $sth = $self->dbh->prepare("SELECT column_name,information_schema.check_constraints.check_clause FROM information_schema.constraint_column_usage JOIN information_schema.check_constraints USING(constraint_catalog,constraint_schema,constraint_name) WHERE table_name=? AND column_name=?");
431 $sth->execute($table,$col);
432 while (my $rec = $sth->fetchrow_hashref) {
433 if ($rec->{column_name} eq $col) {
434 $sth->finish;
435 return $rec->{check_clause};
438 return undef;
441 sub fid_type {
442 my $self = shift;
443 return $self->{_fid_type} if $self->{_fid_type};
445 # let people force bigint mode with environment.
446 if ($ENV{MOG_FIDSIZE} && $ENV{MOG_FIDSIZE} eq "big") {
447 return $self->{_fid_type} = "BIGINT";
450 # else, check a maybe-existing table and see if we're in bigint
451 # mode already.
452 my $dbh = $self->dbh;
453 my $file_fid_type = $self->column_type("file", "fid");
454 if($file_fid_type) {
455 if ($file_fid_type =~ /bigint/i) {
456 return $self->{_fid_type} = "BIGINT";
457 } elsif($file_fid_type =~ /int/i) {
458 # Old installs might not have raised the fid type size yet.
459 return $self->{_fid_type} = "INT";
463 # Used to default to 32bit ints, but this always bites people
464 # a few years down the road. So default to 64bit.
465 return $self->{_fid_type} = "BIGINT";
468 # --------------------------------------------------------------------------
469 # Test suite things we override
470 # --------------------------------------------------------------------------
472 sub new_temp {
473 my $self = shift;
474 my %args = @_;
475 my $dbname = $args{dbname} || "tmp_mogiletest";
476 my $host = $args{dbhost} || 'localhost';
477 my $port = $args{dbport} || 5432;
478 my $user = $args{dbuser} || 'mogile';
479 my $pass = $args{dbpass} || '';
480 my $rootuser = $args{dbrootuser} || $args{dbuser} || 'postgres';
481 my $rootpass = $args{dbrootpass} || $args{dbpass} || '';
482 _drop_db($dbname,$host,$port,$rootuser,$rootpass);
484 my @args = ( "$FindBin::Bin/../mogdbsetup", "--yes",
485 "--dbname=$dbname", "--type=Postgres",
486 "--dbhost=$host", "--dbport=$port",
487 "--dbuser=$user",
488 "--dbrootuser=$rootuser", );
489 push @args, "--dbpass=$pass" unless $pass eq '';
490 push @args, "--dbrootpass=$rootpass" unless $rootpass eq '';
491 system(@args)
492 and die "Failed to run mogdbsetup (".join(' ',map { "'".$_."'" } @args).").";
494 return MogileFS::Store->new_from_dsn_user_pass("dbi:Pg:dbname=$dbname;host=$host;port=$port",
495 $user,
496 $pass);
499 my $rootdbh;
500 sub _root_dbh {
501 my $host = shift;
502 my $port = shift;
503 my $rootuser = shift;
504 my $rootpass = shift;
505 return $rootdbh ||= DBI->connect("DBI:Pg:dbname=postgres;host=$host;port=$port", $rootuser, $rootpass, { RaiseError => 1 })
506 or die "Couldn't connect to local PostgreSQL database as $rootuser";
509 sub _drop_db {
510 my $dbname = shift;
511 my $host = shift;
512 my $port = shift;
513 my $rootuser = shift;
514 my $rootpass = shift;
515 my $root_dbh = _root_dbh($host, $port, $rootuser, $rootpass);
516 eval {
517 $root_dbh->do("DROP DATABASE $dbname");
522 # --------------------------------------------------------------------------
523 # Data-access things we override
524 # --------------------------------------------------------------------------
526 # return new classid on success (non-zero integer), die on failure
527 # throw 'dup' on duplicate name
528 # TODO: add locks around entire table
529 sub create_class {
530 my ($self, $dmid, $classname) = @_;
531 my $dbh = $self->dbh;
533 # get the max class id in this domain
534 my $maxid = $dbh->selectrow_array
535 ('SELECT MAX(classid) FROM class WHERE dmid = ?', undef, $dmid) || 0;
537 # now insert the new class
538 my $rv = eval {
539 $dbh->do("INSERT INTO class (dmid, classid, classname, mindevcount) VALUES (?, ?, ?, ?)",
540 undef, $dmid, $maxid + 1, $classname, 2);
542 if ($@ || $dbh->err) {
543 # first is error code for duplicates
544 if ($self->was_duplicate_error) {
545 throw("dup");
548 return $maxid + 1 if $rv;
549 $self->condthrow;
550 die;
553 # returns 1 on success, 0 on duplicate key error, dies on exception
554 # TODO: need a test to hit the duplicate name error condition
555 sub rename_file {
556 my ($self, $fidid, $to_key) = @_;
557 my $dbh = $self->dbh;
558 eval {
559 $dbh->do('UPDATE file SET dkey = ? WHERE fid=?',
560 undef, $to_key, $fidid);
562 if ($@ || $dbh->err) {
563 # first is error code for duplicates
564 if ($self->was_duplicate_error) {
565 return 0;
566 } else {
567 die $@;
570 $self->condthrow;
571 return 1;
574 # add a record of fidid existing on devid
575 # returns 1 on success, 0 on duplicate
576 sub add_fidid_to_devid {
577 my ($self, $fidid, $devid) = @_;
578 my $dbh = $self->dbh;
579 eval {
580 $dbh->do("INSERT INTO file_on (fid, devid) VALUES (?, ?)", undef, $fidid, $devid);
583 return 1 if !$@ && !$dbh->err;
584 return 0;
587 # update the device count for a given fidid
588 sub update_devcount_atomic {
589 my ($self, $fidid) = @_;
590 my $rv;
592 $self->dbh->begin_work;
593 $rv = $self->dbh->do("SELECT devcount FROM file WHERE fid=? FOR UPDATE", undef, $fidid);
594 $self->condthrow;
595 if($rv == 0) {
596 $self->dbh->rollback;
597 return 1;
599 $rv = $self->dbh->do("UPDATE file SET devcount=(SELECT COUNT(devid) FROM file_on WHERE fid=?) WHERE fid=?", undef, $fidid, $fidid);
600 $self->condthrow;
601 $self->dbh->commit;
602 $self->condthrow;
603 return $rv;
606 sub should_begin_replicating_fidid {
607 my ($self, $fidid) = @_;
608 my $lockname = "mgfs:fid:$fidid:replicate";
609 return 1 if $self->get_lock($lockname, 1);
610 return 0;
613 sub note_done_replicating {
614 my ($self, $fidid) = @_;
615 my $lockname = "mgfs:fid:$fidid:replicate";
616 $self->release_lock($lockname);
619 # enqueue a fidid for replication, from a specific deviceid (can be undef), in a given number of seconds.
620 sub enqueue_for_replication {
621 my ($self, $fidid, $from_devid, $in) = @_;
622 my $dbh = $self->dbh;
624 my $nexttry = 0;
625 if ($in) {
626 $nexttry = $self->unix_timestamp." + ${in}::int";
629 eval {
630 $dbh->do("INSERT INTO file_to_replicate (fid, fromdevid, nexttry) VALUES (?, ?, $nexttry)",
631 undef, $fidid, $from_devid);
635 # reschedule all deferred replication, return number rescheduled
636 sub replicate_now {
637 my ($self) = @_;
638 return $self->dbh->do("UPDATE file_to_replicate SET nexttry = ".$self->unix_timestamp." WHERE nexttry > ".$self->unix_timestamp);
641 sub reschedule_file_to_replicate_relative {
642 my ($self, $fid, $in_n_secs) = @_;
643 $self->dbh->do("UPDATE file_to_replicate SET nexttry = ".$self->unix_timestamp." + ?, failcount = failcount + 1 WHERE fid = ?",
644 undef, $in_n_secs, $fid);
647 # creates a new domain, given a domain namespace string. return the dmid on success,
648 # throw 'dup' on duplicate name.
649 sub create_domain {
650 my ($self, $name) = @_;
651 my $dbh = $self->dbh;
653 # get the max domain id
654 my $maxid = $dbh->selectrow_array('SELECT MAX(dmid) FROM domain') || 0;
655 my $rv = eval {
656 $dbh->do('INSERT INTO domain (dmid, namespace) VALUES (?, ?)',
657 undef, $maxid + 1, $name);
659 if ($self->was_duplicate_error) {
660 throw("dup");
662 return $maxid+1 if $rv;
663 die "failed to make domain"; # FIXME: the above is racy.
666 sub set_server_setting {
667 my ($self, $key, $val) = @_;
668 my $dbh = $self->dbh;
670 if (defined $val) {
671 $self->insert_or_update(
672 insert => "INSERT INTO server_settings (field, value) VALUES (?, ?)",
673 insert_vals => [ $key, $val ],
674 update => "UPDATE server_settings SET value = ? WHERE field = ?",
675 update_vals => [ $val, $key ],
677 } else {
678 $dbh->do("DELETE FROM server_settings WHERE field=?", undef, $key);
681 die "Error updating 'server_settings': " . $dbh->errstr if $dbh->err;
682 return 1;
685 # This implementation is race-safe
686 sub incr_server_setting {
687 my ($self, $key, $val) = @_;
688 $val = 1 unless defined $val;
689 return unless $val;
691 $self->dbh->begin_work;
692 my $value = $self->dbh->selectrow_array("SELECT value FROM server_settings WHERE field=? FOR UPDATE",undef,$key);
693 if($value) {
694 if($value =~ /^\d+$/) {
695 $value += $val;
696 } else {
697 warning("Wanted to incr_server_setting by $val on field=$key but old value was $value. Setting instead.");
698 $value = $val;
700 my $rv = $self->dbh->do("UPDATE server_settings ".
701 "SET value=? ".
702 "WHERE field=?", undef,
703 $value, $key) > 0;
704 $self->dbh->commit;
705 return 1 if $rv;
707 $self->dbh->rollback; # Release the row-lock
708 $self->set_server_setting($key, $val);
711 # return 1 on success, throw "dup" on duplicate devid or throws other error on failure
712 sub create_device {
713 my ($self, $devid, $hostid, $status) = @_;
714 my $rv = $self->conddup(sub {
715 $self->dbh->do("INSERT INTO device (devid, hostid, status) VALUES (?, ?, ?)", undef,
716 $devid, $hostid, $status);
718 $self->condthrow;
719 die "error making device $devid\n" unless $rv > 0;
720 return 1;
723 sub mark_fidid_unreachable {
724 my ($self, $fidid) = @_;
725 my $dbh = $self->dbh;
727 eval {
728 $self->insert_or_update(
729 insert => "INSERT INTO unreachable_fids (fid, lastupdate) VALUES (?, ".$self->unix_timestamp.")",
730 insert_vals => [ $fidid ],
731 update => "UPDATE unreachable_fids SET lastupdate = ".$self->unix_timestamp." WHERE field = ?",
732 update_vals => [ $fidid ],
737 sub delete_fidid {
738 my ($self, $fidid) = @_;
739 $self->dbh->do("DELETE FROM file WHERE fid=?", undef, $fidid);
740 $self->condthrow;
741 $self->dbh->do("DELETE FROM tempfile WHERE fid=?", undef, $fidid);
742 $self->condthrow;
743 $self->enqueue_for_delete2($fidid, 0);
744 $self->condthrow;
747 sub replace_into_file {
748 my $self = shift;
749 my %arg = $self->_valid_params([qw(fidid dmid key length classid)], @_);
750 $self->insert_or_update(
751 insert => "INSERT INTO file (fid, dmid, dkey, length, classid, devcount) VALUES (?, ?, ?, ?, ?, 0)",
752 insert_vals => [ @arg{'fidid', 'dmid', 'key', 'length', 'classid'} ],
753 update => "UPDATE file SET dmid=?, dkey=?, length=?, classid=?, devcount=0 WHERE fid=?",
754 update_vals => [ @arg{'dmid', 'key', 'length', 'classid', 'fidid'} ],
756 $self->condthrow;
759 # given an array of MogileFS::DevFID objects, mass-insert them all
760 # into file_on (ignoring if they're already present)
761 sub mass_insert_file_on {
762 my ($self, @devfids) = @_;
763 my @qmarks = map { "(?,?)" } @devfids;
764 my @binds = map { $_->fidid, $_->devid } @devfids;
766 my $sth = $self->dbh->prepare("INSERT INTO file_on (fid, devid) VALUES (?, ?)");
767 foreach (@devfids) {
768 eval {
769 $sth->execute($_->fidid, $_->devid);
771 $self->condthrow unless $self->was_duplicate_error;
773 return 1;
775 sub lockid {
776 my ($lockname) = @_;
777 croak("Called with empty lockname! $lockname") unless (defined $lockname && length($lockname) > 0);
778 my $num = unpack 'N',md5($lockname);
779 return ($num & 0x7fffffff);
782 # attempt to grab a lock of lockname, and timeout after timeout seconds.
783 # the lock should be unique in the space of (lockid), as well the space of
784 # (hostname,pid).
785 # returns 1 on success and 0 on timeout
786 sub get_lock {
787 my ($self, $lockname, $timeout) = @_;
788 my $lockid = lockid($lockname);
789 die "Lock recursion detected (grabbing $lockname ($lockid), had $self->{last_lock} (".lockid($self->{last_lock})."). Bailing out." if $self->{lock_depth};
791 debug("$$ Locking $lockname ($lockid)\n") if $Mgd::DEBUG >= 5;
793 my $lock = undef;
794 while($timeout > 0 and not defined($lock)) {
795 $lock = eval { $self->dbh->do('INSERT INTO lock (lockid,hostname,pid,acquiredat) VALUES (?, ?, ?, '.$self->unix_timestamp().')', undef, $lockid, hostname, $$) };
796 if($self->was_duplicate_error) {
797 $timeout--;
798 sleep 1;
799 next;
801 $self->condthrow;
802 #$lock = $self->dbh->selectrow_array("SELECT pg_try_advisory_lock(?, ?)", undef, $lockid, $timeout);
803 #warn("$$ Lock result=$lock\n");
804 if (defined $lock and $lock == 1) {
805 $self->{lock_depth} = 1;
806 $self->{last_lock} = $lockname;
807 } else {
808 die "Something went horribly wrong while getting lock $lockname";
811 return $lock;
814 # attempt to release a lock of lockname.
815 # returns 1 on success and 0 if no lock we have has that name.
816 sub release_lock {
817 my ($self, $lockname) = @_;
818 my $lockid = lockid($lockname);
819 debug("$$ Unlocking $lockname ($lockid)\n") if $Mgd::DEBUG >= 5;
820 #my $rv = $self->dbh->selectrow_array("SELECT pg_advisory_unlock(?)", undef, $lockid);
821 my $rv = $self->dbh->do('DELETE FROM lock WHERE lockid=? AND pid=? AND hostname=?', undef, $lockid, $$, hostname);
822 debug("Double-release of lock $lockname!") if $self->{lock_depth} != 0 and $rv == 0 and $Mgd::DEBUG >= 2;
823 $self->condthrow;
824 $self->{lock_depth} = 0;
825 return $rv;
830 __END__
832 =head1 NAME
834 MogileFS::Store::Postgres - PostgreSQL data storage for MogileFS
836 =head1 SEE ALSO
838 L<MogileFS::Store>