package MogileFS::Store::Postgres;
# vim: ts=4 sw=4 et ft=perl:
use strict;
use Digest::MD5 qw(md5); # Used for lockid
use DBI;
use DBD::Pg;
use MogileFS::Util qw(throw debug error);
use MogileFS::Server;
use Carp;
use base 'MogileFS::Store';
# --------------------------------------------------------------------------
# Package methods we override
# --------------------------------------------------------------------------
sub dsn_of_dbhost {
my ($class, $dbname, $host) = @_;
return "DBI:Pg:dbname=$dbname;host=$host";
}
sub dsn_of_root {
my ($class, $dbname, $host) = @_;
return "DBI:Pg:dbname=postgres;host=$host";
}
# --------------------------------------------------------------------------
# Store-related things we override
# --------------------------------------------------------------------------
sub want_raise_errors { 1 }
# given a root DBI connection, create the named database. succeed
# if it it's made, or already exists. die otherwise.
sub create_db_if_not_exists {
my ($pkg, $rdbh, $dbname) = @_;
if(not $rdbh->do("CREATE DATABASE $dbname")) {
die "Failed to create database '$dbname': " . $rdbh->errstr . "\n" if ($rdbh->errstr !~ /already exists/);
}
}
sub grant_privileges {
my ($pkg, $rdbh, $dbname, $user, $pass) = @_;
eval {
$rdbh->do("CREATE ROLE $user LOGIN PASSWORD ?",
undef, $pass);
};
die "Failed to create user '$user': ". $rdbh->errstr . "\n"
if $rdbh->err && $rdbh->state != '42710';
# Owning the database is postgres is important
$rdbh->do("ALTER DATABASE $dbname OWNER TO $user")
or die "Failed to grant privileges " . $rdbh->errstr . "\n";
}
sub can_replace { 0 }
sub can_insertignore { 0 }
sub can_insert_multi { 0 }
sub unix_timestamp { "EXTRACT(epoch FROM NOW())::int4" }
sub init {
my $self = shift;
$self->SUPER::init;
my $database_version = $self->dbh->get_info(18); # SQL_DBMS_VER
# We need >=pg-8.2 because we use SAVEPOINT and ROLLBACK TO.
die "Postgres is too old! Must use >=postgresql-8.2!" if($database_version =~ /\A0[0-7]\.|08\.0[01]/);
$self->{lock_depth} = 0;
}
sub post_dbi_connect {
my $self = shift;
$self->SUPER::post_dbi_connect;
$self->{lock_depth} = 0;
}
sub can_do_slaves { 0 }
# TODO: Implement later
#sub check_slave {
#}
sub was_duplicate_error {
my $self = shift;
my $dbh = $self->dbh;
return 0 unless $dbh->err;
return 1 if $dbh->state == '23505' || $dbh->errstr =~ /duplicate/i;
}
sub table_exists {
my ($self, $table) = @_;
return eval {
my $sth = $self->dbh->table_info(undef, undef, $table, "table");
my $rec = $sth->fetchrow_hashref;
return $rec ? 1 : 0;
};
}
sub setup_database {
my $self = shift;
$self->add_extra_tables('lock');
return $self->SUPER::setup_database;
}
sub filter_create_sql {
my ($self, $sql) = @_;
$sql =~ s/\bUNSIGNED\b//g;
$sql =~ s/\b(?:TINY|MEDIUM)INT\b/SMALLINT/g;
$sql =~ s/\bINT\s+NOT\s+NULL\s+AUTO_INCREMENT\b/SERIAL/g;
$sql =~ s/# /-- /g;
my ($table) = $sql =~ /create\s+table\s+(\S+)/i;
die "didn't find table" unless $table;
if ($self->can("INDEXES_$table")) {
$sql =~ s!,\s+INDEX\s*\(.+?\)!!mg;
}
return $sql;
}
sub TABLE_file {
"CREATE TABLE file (
fid INT NOT NULL,
PRIMARY KEY (fid),
dmid SMALLINT NOT NULL,
dkey VARCHAR(255), -- domain-defined
UNIQUE (dmid, dkey),
length INT, -- 2TiB limit
CHECK (length >= 0),
classid SMALLINT NOT NULL,
devcount SMALLINT NOT NULL
)"
}
sub INDEXES_file {
"CREATE INDEX file_devcount ON file (dmid,classid,devcount)"
}
sub INDEXES_unreachable_fids {
"CREATE INDEX unreachable_fids_lastupdate ON unreachable_fids (lastupdate)"
}
sub INDEXES_file_on {
"CREATE INDEX file_on_devid ON file_on (devid)"
}
sub TABLE_host {
"CREATE TABLE host (
hostid SMALLINT NOT NULL,
PRIMARY KEY (hostid),
CHECK (hostid >= 0),
status VARCHAR(8),
CHECK (status IN ('alive','dead','down')),
http_port INT DEFAULT 7500,
CHECK (http_port >= 0),
CHECK (http_port < 65536),
http_get_port INT,
CHECK (http_get_port >= 0),
CHECK (http_get_port < 65536),
hostname VARCHAR(40),
UNIQUE (hostname),
hostip VARCHAR(15),
UNIQUE (hostip),
altip VARCHAR(15),
UNIQUE (altip),
altmask VARCHAR(18)
)"
}
sub TABLE_device {
"CREATE TABLE device (
devid SMALLINT NOT NULL,
PRIMARY KEY (devid),
CHECK (devid >= 0),
hostid SMALLINT NOT NULL,
status VARCHAR(8),
CHECK (status IN ('alive','dead','down','readonly','drain')),
weight INT DEFAULT 100,
mb_total INT,
CHECK (mb_total >= 0),
mb_used INT,
CHECK (mb_used >= 0),
mb_asof INT
CHECK (mb_asof >= 0)
)"
}
sub INDEXES_device {
"CREATE INDEX device_status ON device (status)"
}
sub INDEXES_file_to_replicate {
"CREATE INDEX file_to_replicate_nexttry ON file_to_replicate (nexttry)"
}
sub INDEXES_file_to_delete_later {
"CREATE INDEX file_to_delete_later_delafter ON file_to_delete_later (delafter)"
}
sub INDEXES_fsck_log {
"CREATE INDEX fsck_log_utime ON fsck_log (utime)"
}
# Extra table
sub TABLE_lock {
"CREATE TABLE lock (
lockid INT NOT NULL,
PRIMARY KEY (lockid),
CHECK (lockid >= 0),
hostname VARCHAR(255) NOT NULL,
pid INT NOT NULL,
CHECK (pid >= 0),
acquiredat INT NOT NULL,
CHECK (acquiredat >= 0)
)"
}
sub upgrade_add_host_getport {
my $self = shift;
# see if they have the get port, else update it
unless ($self->column_type("host", "http_get_port")) {
$self->dowell("ALTER TABLE host ADD COLUMN http_get_port INT CHECK(http_get_port >= 0)");
}
}
sub upgrade_add_host_altip {
my $self = shift;
unless ($self->column_type("host", "altip")) {
$self->dowell("ALTER TABLE host ADD COLUMN altip VARCHAR(15)");
$self->dowell("ALTER TABLE host ADD COLUMN altmask VARCHAR(18)");
$self->dowell("ALTER TABLE host ADD UNIQUE altip (altip)");
}
}
sub upgrade_add_device_asof {
my $self = shift;
unless ($self->column_type("device", "mb_asof")) {
$self->dowell("ALTER TABLE device ADD COLUMN mb_asof INT CHECK(mb_asof >= 0)");
}
}
sub upgrade_add_device_weight {
my $self = shift;
unless ($self->column_type("device", "weight")) {
$self->dowell("ALTER TABLE device ADD COLUMN weight INT DEFAULT 100");
}
}
sub upgrade_add_device_readonly {
my $self = shift;
unless ($self->column_constraint("device", "status") =~ /readonly/) {
$self->dowell("ALTER TABLE device MODIFY COLUMN status VARCHAR(8) CHECK(status IN ('alive', 'dead', 'down', 'readonly'))");
}
}
sub upgrade_add_device_drain {
my $self = shift;
unless ($self->column_constraint("device", "status") =~ /drain/) {
$self->dowell("ALTER TABLE device MODIFY COLUMN status VARCHAR(8) CHECK(status IN ('alive', 'dead', 'down', 'readonly','drain'))");
}
}
# return 1 on success. die otherwise.
sub enqueue_fids_to_delete {
my ($self, @fidids) = @_;
my $sql = "INSERT INTO file_to_delete (fid) VALUES (?)";
my $savepoint_name = "savepoint_enqueue_fids_to_delete";
$self->dbh->begin_work;
foreach my $fidid (@fidids) {
$self->dbh->do('SAVEPOINT '.$savepoint_name);
$self->condthrow;
eval {
$self->dbh->do($sql, undef, $fidid);
};
if ($@ || $self->dbh->err) {
if ($self->was_duplicate_error) {
$self->dbh->do('ROLLBACK TO '.$savepoint_name);
}
$self->condthrow;
}
}
$self->dbh->commit;
}
# --------------------------------------------------------------------------
# Functions specific to Store::Postgres subclass. Not in parent.
# --------------------------------------------------------------------------
sub insert_or_ignore {
my $self = shift;
my %arg = $self->_valid_params([qw(insert insert_vals)], @_);
return $self->insert_or_update(
insert => $arg{insert},
insert_vals => $arg{insert_vals},
update => 'IGNORE',
update_vals => 'IGNORE',
);
}
sub insert_or_update {
my $self = shift;
my %arg = $self->_valid_params([qw(insert update insert_vals update_vals)], @_);
my $dbh = $self->dbh;
my $savepoint_name = $arg{insert};
$savepoint_name =~ s/^INSERT INTO ([^\s]+).*$/$1/g;
$dbh->begin_work;
$dbh->do('SAVEPOINT '.$savepoint_name);
eval {
$dbh->do($arg{insert}, undef, @{ $arg{insert_vals} });
};
if ($@ || $dbh->err) {
if ($self->was_duplicate_error) {
$dbh->do('ROLLBACK TO '.$savepoint_name);
if($arg{update} ne "IGNORE") {
$dbh->do($arg{update}, undef, @{ $arg{update_vals} });
}
}
$self->condthrow;
}
$dbh->commit;
return 1;
}
sub column_type {
my ($self, $table, $col) = @_;
my $sth = $self->dbh->prepare("SELECT column_name,data_type FROM information_schema.columns WHERE table_name=? AND column_name=?");
$sth->execute($table,$col);
while (my $rec = $sth->fetchrow_hashref) {
if ($rec->{column_name} eq $col) {
$sth->finish;
return $rec->{data_type};
}
}
return undef;
}
sub column_constraint {
my ($self, $table, $col) = @_;
my $sth = $self->dbh->prepare("SELECT column_name,information_schema.check_constraints.check_clause FROM information_schema.constraint_column_usage JOIN information_schema.check_constraints USING(constraint_catalog,constraint_schema,constraint_name) WHERE table_name=? AND column_name=?");
$sth->execute($table,$col);
while (my $rec = $sth->fetchrow_hashref) {
if ($rec->{column_name} eq $col) {
$sth->finish;
return $rec->{check_clause};
}
}
return undef;
}
# --------------------------------------------------------------------------
# Test suite things we override
# --------------------------------------------------------------------------
sub new_temp {
my $dbname = "tmp_mogiletest";
_drop_db($dbname);
system("$FindBin::Bin/../mogdbsetup", "--yes", "--dbname=$dbname", "--type=Postgres", "--dbrootuser=postgres")
and die "Failed to run mogdbsetup ($FindBin::Bin/../mogdbsetup).";
return MogileFS::Store->new_from_dsn_user_pass("dbi:Pg:dbname=$dbname",
"mogile",
"");
}
my $rootdbh;
sub _root_dbh {
return $rootdbh ||= DBI->connect("DBI:Pg:dbname=postgres", "postgres", "", { RaiseError => 1 })
or die "Couldn't connect to local PostgreSQL database as postgres";
}
sub _drop_db {
my $dbname = shift;
my $root_dbh = _root_dbh();
eval {
$root_dbh->do("DROP DATABASE $dbname");
};
}
# --------------------------------------------------------------------------
# Data-access things we override
# --------------------------------------------------------------------------
# return new classid on success (non-zero integer), die on failure
# throw 'dup' on duplicate name
# TODO: add locks around entire table
sub create_class {
my ($self, $dmid, $classname) = @_;
my $dbh = $self->dbh;
# get the max class id in this domain
my $maxid = $dbh->selectrow_array
('SELECT MAX(classid) FROM class WHERE dmid = ?', undef, $dmid) || 0;
# now insert the new class
my $rv = eval {
$dbh->do("INSERT INTO class (dmid, classid, classname, mindevcount) VALUES (?, ?, ?, ?)",
undef, $dmid, $maxid + 1, $classname, 2);
};
if ($@ || $dbh->err) {
# first is error code for duplicates
if ($self->was_duplicate_error) {
throw("dup");
}
}
return $maxid + 1 if $rv;
$self->condthrow;
die;
}
# returns 1 on success, 0 on duplicate key error, dies on exception
# TODO: need a test to hit the duplicate name error condition
sub rename_file {
my ($self, $fidid, $to_key) = @_;
my $dbh = $self->dbh;
eval {
$dbh->do('UPDATE file SET dkey = ? WHERE fid=?',
undef, $to_key, $fidid);
};
if ($@ || $dbh->err) {
# first is error code for duplicates
if ($self->was_duplicate_error) {
return 0;
} else {
die $@;
}
}
$self->condthrow;
return 1;
}
# add a record of fidid existing on devid
# returns 1 on success, 0 on duplicate
sub add_fidid_to_devid {
my ($self, $fidid, $devid) = @_;
my $dbh = $self->dbh;
eval {
$dbh->do("INSERT INTO file_on (fid, devid) VALUES (?, ?)", undef, $fidid, $devid);
};
return 1 if !$@ && !$dbh->err;
return 0;
}
# update the device count for a given fidid
sub update_devcount_atomic {
my ($self, $fidid) = @_;
my $rv;
$self->dbh->begin_work;
$rv = $self->dbh->do("SELECT devcount FROM file WHERE fid=? FOR UPDATE", undef, $fidid);
$self->condthrow;
if($rv == 0) {
$self->dbh->rollback;
return 1;
}
$rv = $self->dbh->do("UPDATE file SET devcount=(SELECT COUNT(devid) FROM file_on WHERE fid=?) WHERE fid=?", undef, $fidid, $fidid);
$self->condthrow;
$self->dbh->commit;
$self->condthrow;
return $rv;
}
sub should_begin_replicating_fidid {
my ($self, $fidid) = @_;
my $lockname = "mgfs:fid:$fidid:replicate";
return 1 if $self->get_lock($lockname, 1);
return 0;
}
sub note_done_replicating {
my ($self, $fidid) = @_;
my $lockname = "mgfs:fid:$fidid:replicate";
$self->release_lock($lockname);
}
# enqueue a fidid for replication, from a specific deviceid (can be undef), in a given number of seconds.
sub enqueue_for_replication {
my ($self, $fidid, $from_devid, $in) = @_;
my $dbh = $self->dbh;
my $nexttry = 0;
if ($in) {
$nexttry = $self->unix_timestamp." + ${in}::int";
}
eval {
$dbh->do("INSERT INTO file_to_replicate (fid, fromdevid, nexttry) VALUES (?, ?, $nexttry)",
undef, $fidid, $from_devid);
};
}
# reschedule all deferred replication, return number rescheduled
sub replicate_now {
my ($self) = @_;
return $self->dbh->do("UPDATE file_to_replicate SET nexttry = ".$self->unix_timestamp." WHERE nexttry > ".$self->unix_timestamp);
}
sub reschedule_file_to_replicate_relative {
my ($self, $fid, $in_n_secs) = @_;
$self->dbh->do("UPDATE file_to_replicate SET nexttry = ".$self->unix_timestamp." + ?, failcount = failcount + 1 WHERE fid = ?",
undef, $in_n_secs, $fid);
}
# creates a new domain, given a domain namespace string. return the dmid on success,
# throw 'dup' on duplicate name.
sub create_domain {
my ($self, $name) = @_;
my $dbh = $self->dbh;
# get the max domain id
my $maxid = $dbh->selectrow_array('SELECT MAX(dmid) FROM domain') || 0;
my $rv = eval {
$dbh->do('INSERT INTO domain (dmid, namespace) VALUES (?, ?)',
undef, $maxid + 1, $name);
};
if ($self->was_duplicate_error) {
throw("dup");
}
return $maxid+1 if $rv;
die "failed to make domain"; # FIXME: the above is racy.
}
sub set_server_setting {
my ($self, $key, $val) = @_;
my $dbh = $self->dbh;
if (defined $val) {
$self->insert_or_update(
insert => "INSERT INTO server_settings (field, value) VALUES (?, ?)",
insert_vals => [ $key, $val ],
update => "UPDATE server_settings SET value = ? WHERE field = ?",
update_vals => [ $val, $key ],
);
} else {
$dbh->do("DELETE FROM server_settings WHERE field=?", undef, $key);
}
die "Error updating 'server_settings': " . $dbh->errstr if $dbh->err;
return 1;
}
# This implementation is race-safe
sub incr_server_setting {
my ($self, $key, $val) = @_;
$val = 1 unless defined $val;
return unless $val;
$self->dbh->begin_work;
my $value = $self->dbh->selectrow_array("SELECT value FROM server_settings WHERE field=? FOR UPDATE",undef,$key);
if($value) {
if($value =~ /^\d+$/) {
$value += $val;
} else {
warning("Wanted to incr_server_setting by $val on field=$key but old value was $value. Setting instead.");
$value = $val;
}
my $rv = $self->dbh->do("UPDATE server_settings ".
"SET value=? ".
"WHERE field=?", undef,
$value, $key) > 0;
$self->dbh->commit;
return 1 if $rv;
}
$self->dbh->rollback; # Release the row-lock
$self->set_server_setting($key, $val);
}
# return 1 on success, throw "dup" on duplicate devid or throws other error on failure
sub create_device {
my ($self, $devid, $hostid, $status) = @_;
my $rv = $self->conddup(sub {
$self->dbh->do("INSERT INTO device (devid, hostid, status) VALUES (?, ?, ?)", undef,
$devid, $hostid, $status);
});
$self->condthrow;
die "error making device $devid\n" unless $rv > 0;
return 1;
}
sub mark_fidid_unreachable {
my ($self, $fidid) = @_;
my $dbh = $self->dbh;
eval {
$self->insert_or_update(
insert => "INSERT INTO unreachable_fids (fid, lastupdate) VALUES (?, ".$self->unix_timestamp.")",
insert_vals => [ $fidid ],
update => "UPDATE unreachable_fids SET lastupdate = ".$self->unix_timestamp." WHERE field = ?",
update_vals => [ $fidid ],
);
};
}
sub delete_fidid {
my ($self, $fidid) = @_;
$self->dbh->do("DELETE FROM file WHERE fid=?", undef, $fidid);
$self->condthrow;
$self->dbh->do("DELETE FROM tempfile WHERE fid=?", undef, $fidid);
$self->condthrow;
$self->insert_or_ignore(
insert => "INSERT INTO file_to_delete (fid) VALUES (?)",
insert_vals => [ $fidid ],
);
$self->condthrow;
}
sub replace_into_file {
my $self = shift;
my %arg = $self->_valid_params([qw(fidid dmid key length classid)], @_);
$self->insert_or_update(
insert => "INSERT INTO file (fid, dmid, dkey, length, classid, devcount) VALUES (?, ?, ?, ?, ?, 0)",
insert_vals => [ @arg{'fidid', 'dmid', 'key', 'length', 'classid'} ],
update => "UPDATE file SET dmid=?, dkey=?, length=?, classid=?, devcount=0 WHERE fid=?",
update_vals => [ @arg{'dmid', 'key', 'length', 'classid', 'fidid'} ],
);
$self->condthrow;
}
# given an array of MogileFS::DevFID objects, mass-insert them all
# into file_on (ignoring if they're already present)
sub mass_insert_file_on {
my ($self, @devfids) = @_;
my @qmarks = map { "(?,?)" } @devfids;
my @binds = map { $_->fidid, $_->devid } @devfids;
my $sth = $self->dbh->prepare("INSERT INTO file_on (fid, devid) VALUES (?, ?)");
foreach (@devfids) {
eval {
$sth->execute($_->fidid, $_->devid);
};
$self->condthrow unless $self->was_duplicate_error;
}
return 1;
}
sub lockid {
my ($lockname) = @_;
croak("Called with empty lockname! $lockname") unless (defined $lockname && length($lockname) > 0);
my $num = unpack 'N',md5($lockname);
return ($num & 0x7fffffff);
}
# attempt to grab a lock of lockname, and timeout after timeout seconds.
# the lock should be unique in the space of (lockid), as well the space of
# (hostname,pid).
# returns 1 on success and 0 on timeout
sub get_lock {
use Sys::Hostname;
my ($self, $lockname, $timeout) = @_;
my $lockid = lockid($lockname);
die "Lock recursion detected (grabbing $lockname ($lockid), had $self->{last_lock} (".lockid($self->{last_lock})."). Bailing out." if $self->{lock_depth};
debug("$$ Locking $lockname ($lockid)\n") if $Mgd::DEBUG >= 5;
my $lock = undef;
while($timeout > 0 and not defined($lock)) {
$lock = eval { $self->dbh->do('INSERT INTO lock (lockid,hostname,pid,acquiredat) VALUES (?, ?, ?, '.$self->unix_timestamp().')', undef, $lockid, hostname, $$) };
if($self->was_duplicate_error) {
$timeout--;
sleep 1;
next;
}
$self->condthrow;
#$lock = $self->dbh->selectrow_array("SELECT pg_try_advisory_lock(?, ?)", undef, $lockid, $timeout);
#warn("$$ Lock result=$lock\n");
if (defined $lock and $lock == 1) {
$self->{lock_depth} = 1;
$self->{last_lock} = $lockname;
} else {
die "Something went horribly wrong while getting lock $lockname";
}
}
return $lock;
}
# attempt to release a lock of lockname.
# returns 1 on success and 0 if no lock we have has that name.
sub release_lock {
my ($self, $lockname) = @_;
my $lockid = lockid($lockname);
debug("$$ Unlocking $lockname ($lockid)\n") if $Mgd::DEBUG >= 5;
#my $rv = $self->dbh->selectrow_array("SELECT pg_advisory_unlock(?)", undef, $lockid);
my $rv = $self->dbh->do('DELETE FROM lock WHERE lockid=? AND pid=?', undef, $lockid, $$);
debug("Double-release of lock $lockname!") if $self->{lock_depth} != 0 and $rv == 0 and $Mgd::DEBUG >= 2;
$self->condthrow;
$self->{lock_depth} = 0;
return $rv;
}
# return array of { dmid => ..., classid => ..., devcount => ..., count => ... }
sub get_stats_files_per_devcount {
my ($self) = @_;
my $dbh = $self->dbh;
my @ret;
my $sth = $dbh->prepare('SELECT dmid, classid, t1.devcount, COUNT(t1.devcount) AS "count" '.
'FROM file JOIN ('.
'SELECT fid, COUNT(devid) AS devcount FROM file_on GROUP BY fid'.
') t1 USING (fid) '.
'GROUP BY 1, 2, 3');
$sth->execute;
while (my $row = $sth->fetchrow_hashref) {
push @ret, $row;
}
return @ret;
}
1;
__END__
=head1 NAME
MogileFS::Store::Postgres - PostgreSQL data storage for MogileFS
=head1 SEE ALSO
L<MogileFS::Store>
syntax highlighted by Code2HTML, v. 0.9.1