MogileFS-Server
view release on metacpan or search on metacpan
lib/MogileFS/Store.pm view on Meta::CPAN
sub domain_has_files {
my ($self, $dmid) = @_;
my $has_a_fid = $self->dbh->selectrow_array('SELECT fid FROM file WHERE dmid = ? LIMIT 1',
undef, $dmid);
return $has_a_fid ? 1 : 0;
}
sub domain_has_classes {
my ($self, $dmid) = @_;
# queryworker does not permit removing default class, so domain_has_classes
# should not register the default class
my $has_a_class = $self->dbh->selectrow_array('SELECT classid FROM class WHERE dmid = ? AND classid != 0 LIMIT 1',
undef, $dmid);
return defined($has_a_class);
}
sub class_has_files {
my ($self, $dmid, $clid) = @_;
my $has_a_fid = $self->dbh->selectrow_array('SELECT fid FROM file WHERE dmid = ? AND classid = ? LIMIT 1',
undef, $dmid, $clid);
return $has_a_fid ? 1 : 0;
}
# return new classid on success (non-zero integer), die on failure
# throw 'dup' on duplicate name
sub create_class {
my ($self, $dmid, $classname) = @_;
my $dbh = $self->dbh;
my ($clsid, $rv);
eval {
$dbh->begin_work;
if ($classname eq 'default') {
$clsid = 0;
} else {
# get the max class id in this domain
my $maxid = $dbh->selectrow_array
('SELECT MAX(classid) FROM class WHERE dmid = ?', undef, $dmid) || 0;
$clsid = $maxid + 1;
}
# now insert the new class
$rv = $dbh->do("INSERT INTO class (dmid, classid, classname, mindevcount) VALUES (?, ?, ?, ?)",
undef, $dmid, $clsid, $classname, 2);
$dbh->commit if $rv;
};
if ($@ || $dbh->err) {
if ($self->was_duplicate_error) {
# ensure we're not inside a transaction
if ($dbh->{AutoCommit} == 0) { eval { $dbh->rollback }; }
throw("dup");
}
}
$self->condthrow; # this will rollback on errors
return $clsid if $rv;
die;
}
# return 1 on success, throw "dup" on duplicate name error, die otherwise
sub update_class_name {
my $self = shift;
my %arg = $self->_valid_params([qw(dmid classid classname)], @_);
my $rv = eval {
$self->dbh->do("UPDATE class SET classname=? WHERE dmid=? AND classid=?",
undef, $arg{classname}, $arg{dmid}, $arg{classid});
};
throw("dup") if $self->was_duplicate_error;
$self->condthrow;
return 1;
}
# return 1 on success, die otherwise
sub update_class_mindevcount {
my $self = shift;
my %arg = $self->_valid_params([qw(dmid classid mindevcount)], @_);
eval {
$self->dbh->do("UPDATE class SET mindevcount=? WHERE dmid=? AND classid=?",
undef, $arg{mindevcount}, $arg{dmid}, $arg{classid});
};
$self->condthrow;
return 1;
}
# return 1 on success, die otherwise
sub update_class_replpolicy {
my $self = shift;
my %arg = $self->_valid_params([qw(dmid classid replpolicy)], @_);
eval {
$self->dbh->do("UPDATE class SET replpolicy=? WHERE dmid=? AND classid=?",
undef, $arg{replpolicy}, $arg{dmid}, $arg{classid});
};
$self->condthrow;
return 1;
}
# return 1 on success, die otherwise
sub update_class_hashtype {
my $self = shift;
my %arg = $self->_valid_params([qw(dmid classid hashtype)], @_);
eval {
$self->dbh->do("UPDATE class SET hashtype=? WHERE dmid=? AND classid=?",
undef, $arg{hashtype}, $arg{dmid}, $arg{classid});
};
$self->condthrow;
}
sub nfiles_with_dmid_classid_devcount {
my ($self, $dmid, $classid, $devcount) = @_;
return $self->dbh->selectrow_array('SELECT COUNT(*) FROM file WHERE dmid = ? AND classid = ? AND devcount = ?',
undef, $dmid, $classid, $devcount);
}
sub set_server_setting {
my ($self, $key, $val) = @_;
my $dbh = $self->dbh;
die "Your database does not support REPLACE! Reimplement set_server_setting!" unless $self->can_replace;
eval {
if (defined $val) {
$dbh->do("REPLACE INTO server_settings (field, value) VALUES (?, ?)", undef, $key, $val);
} else {
$dbh->do("DELETE FROM server_settings WHERE field=?", undef, $key);
}
};
die "Error updating 'server_settings': " . $dbh->errstr if $dbh->err;
return 1;
}
# FIXME: racy. currently the only caller doesn't matter, but should be fixed.
sub incr_server_setting {
my ($self, $key, $val) = @_;
$val = 1 unless defined $val;
return unless $val;
return 1 if $self->dbh->do("UPDATE server_settings ".
"SET value=value+? ".
"WHERE field=?", undef,
$val, $key) > 0;
$self->set_server_setting($key, $val);
}
sub server_setting {
my ($self, $key) = @_;
return $self->dbh->selectrow_array("SELECT value FROM server_settings WHERE field=?",
undef, $key);
}
sub server_settings {
my ($self) = @_;
my $ret = {};
my $sth = $self->dbh->prepare("SELECT field, value FROM server_settings");
$sth->execute;
while (my ($k, $v) = $sth->fetchrow_array) {
$ret->{$k} = $v;
}
return $ret;
lib/MogileFS/Store.pm view on Meta::CPAN
# return hashref of row containing columns "fid, dmid, dkey, length,
# classid, devcount" provided a $fidid or undef if no row.
sub file_row_from_fidid {
my ($self, $fidid) = @_;
return $self->dbh->selectrow_hashref("SELECT fid, dmid, dkey, length, classid, devcount ".
"FROM file WHERE fid=?",
undef, $fidid);
}
# return an arrayref of rows containing columns "fid, dmid, dkey, length,
# classid, devcount" provided a pair of $fidid or undef if no rows.
sub file_row_from_fidid_range {
my ($self, $fromfid, $count) = @_;
my $sth = $self->dbh->prepare("SELECT fid, dmid, dkey, length, classid, devcount ".
"FROM file WHERE fid > ? LIMIT ?");
$sth->execute($fromfid,$count);
return $sth->fetchall_arrayref({});
}
# return array of devids that a fidid is on
sub fid_devids {
my ($self, $fidid) = @_;
return @{ $self->dbh->selectcol_arrayref("SELECT devid FROM file_on WHERE fid=?",
undef, $fidid) || [] };
}
# return hashref of { $fidid => [ $devid, $devid... ] } for a bunch of given @fidids
sub fid_devids_multiple {
my ($self, @fidids) = @_;
my $in = join(",", map { $_+0 } @fidids);
my $ret = {};
my $sth = $self->dbh->prepare("SELECT fid, devid FROM file_on WHERE fid IN ($in)");
$sth->execute;
while (my ($fidid, $devid) = $sth->fetchrow_array) {
push @{$ret->{$fidid} ||= []}, $devid;
}
return $ret;
}
# return hashref of columns classid, dmid, dkey, given a $fidid, or return undef
sub tempfile_row_from_fid {
my ($self, $fidid) = @_;
return $self->dbh->selectrow_hashref("SELECT classid, dmid, dkey, devids ".
"FROM tempfile WHERE fid=?",
undef, $fidid);
}
# return 1 on success, throw "dup" on duplicate devid or throws other error on failure
sub create_device {
my ($self, $devid, $hostid, $status) = @_;
my $rv = $self->conddup(sub {
$self->dbh->do("INSERT INTO device (devid, hostid, status) VALUES (?,?,?)", undef,
$devid, $hostid, $status);
});
$self->condthrow;
die "error making device $devid\n" unless $rv > 0;
return 1;
}
sub update_device {
my ($self, $devid, $to_update) = @_;
my @keys = sort keys %$to_update;
return unless @keys;
$self->conddup(sub {
$self->dbh->do("UPDATE device SET " . join('=?, ', @keys)
. "=? WHERE devid=?", undef, (map { $to_update->{$_} } @keys),
$devid);
});
return 1;
}
sub update_device_usage {
my $self = shift;
my %arg = $self->_valid_params([qw(mb_total mb_used devid mb_asof)], @_);
eval {
$self->dbh->do("UPDATE device SET ".
"mb_total = ?, mb_used = ?, mb_asof = ?" .
" WHERE devid = ?",
undef, $arg{mb_total}, $arg{mb_used}, $arg{mb_asof},
$arg{devid});
};
$self->condthrow;
}
# MySQL has an optimized version
sub update_device_usages {
my ($self, $updates, $cb) = @_;
foreach my $upd (@$updates) {
$self->update_device_usage(%$upd);
$cb->();
}
}
# This is unimplemented at the moment as we must verify:
# - no file_on rows exist
# - nothing in file_to_queue is going to attempt to use it
# - nothing in file_to_replicate is going to attempt to use it
# - it's already been marked dead
# - that all trackers are likely to know this :/
# - ensure the devid can't be reused
# IE; the user can't mark it dead then remove it all at once and cause their
# cluster to implode.
sub delete_device {
die "Unimplemented; needs further testing";
}
sub set_device_weight {
my ($self, $devid, $weight) = @_;
eval {
$self->dbh->do('UPDATE device SET weight = ? WHERE devid = ?', undef, $weight, $devid);
};
$self->condthrow;
}
sub set_device_state {
my ($self, $devid, $state) = @_;
eval {
$self->dbh->do('UPDATE device SET status = ? WHERE devid = ?', undef, $state, $devid);
};
$self->condthrow;
}
sub delete_class {
my ($self, $dmid, $cid) = @_;
throw("has_files") if $self->class_has_files($dmid, $cid);
eval {
$self->dbh->do("DELETE FROM class WHERE dmid = ? AND classid = ?", undef, $dmid, $cid);
};
$self->condthrow;
}
# called from a queryworker process, will trigger delete_fidid_enqueued
# in the delete worker
sub delete_fidid {
my ($self, $fidid) = @_;
eval { $self->dbh->do("DELETE FROM file WHERE fid=?", undef, $fidid); };
$self->condthrow;
$self->enqueue_for_delete2($fidid, 0);
$self->condthrow;
}
# Only called from delete workers (after delete_fidid),
# this reduces client-visible latency from the queryworker
sub delete_fidid_enqueued {
my ($self, $fidid) = @_;
eval { $self->delete_checksum($fidid); };
lib/MogileFS/Store.pm view on Meta::CPAN
# exist one. Check REPLACE semantics.
my $rv = $self->dowell($self->ignore_replace . " INTO file_on (fid, devid) VALUES (?,?)",
undef, $fidid, $devid);
return 1 if $rv > 0;
return 0;
}
# remove a record of fidid existing on devid
# returns 1 on success, 0 if not there anyway
sub remove_fidid_from_devid {
my ($self, $fidid, $devid) = @_;
my $rv = eval { $self->dbh->do("DELETE FROM file_on WHERE fid=? AND devid=?",
undef, $fidid, $devid); };
$self->condthrow;
return $rv;
}
# Test if host exists.
sub get_hostid_by_id {
my $self = shift;
my ($hostid) = $self->dbh->selectrow_array('SELECT hostid FROM host WHERE hostid = ?',
undef, $_[0]);
return $hostid;
}
sub get_hostid_by_name {
my $self = shift;
my ($hostid) = $self->dbh->selectrow_array('SELECT hostid FROM host WHERE hostname = ?',
undef, $_[0]);
return $hostid;
}
# get all hosts from database, returns them as list of hashrefs, hashrefs being the row contents.
sub get_all_hosts {
my ($self) = @_;
my $sth = $self->dbh->prepare("SELECT /*!40000 SQL_CACHE */ hostid, status, hostname, " .
"hostip, http_port, http_get_port, altip, altmask FROM host");
$sth->execute;
my @ret;
while (my $row = $sth->fetchrow_hashref) {
push @ret, $row;
}
return @ret;
}
# get all devices from database, returns them as list of hashrefs, hashrefs being the row contents.
sub get_all_devices {
my ($self) = @_;
my $sth = $self->dbh->prepare("SELECT /*!40000 SQL_CACHE */ devid, hostid, mb_total, " .
"mb_used, mb_asof, status, weight FROM device");
$self->condthrow;
$sth->execute;
my @return;
while (my $row = $sth->fetchrow_hashref) {
push @return, $row;
}
return @return;
}
# update the device count for a given fidid
sub update_devcount {
my ($self, $fidid) = @_;
my $dbh = $self->dbh;
my $ct = $dbh->selectrow_array("SELECT COUNT(*) FROM file_on WHERE fid=?",
undef, $fidid);
eval { $dbh->do("UPDATE file SET devcount=? WHERE fid=?", undef,
$ct, $fidid); };
$self->condthrow;
return 1;
}
# update the classid for a given fidid
sub update_classid {
my ($self, $fidid, $classid) = @_;
my $dbh = $self->dbh;
$dbh->do("UPDATE file SET classid=? WHERE fid=?", undef,
$classid, $fidid);
$self->condthrow;
return 1;
}
# enqueue a fidid for replication, from a specific deviceid (can be undef), in a given number of seconds.
sub enqueue_for_replication {
my ($self, $fidid, $from_devid, $in) = @_;
my $nexttry = 0;
if ($in) {
$nexttry = $self->unix_timestamp . " + " . int($in);
}
$self->retry_on_deadlock(sub {
$self->insert_ignore("INTO file_to_replicate (fid, fromdevid, nexttry) ".
"VALUES (?,?,$nexttry)", undef, $fidid, $from_devid);
});
}
# enqueue a fidid for delete
# note: if we get one more "independent" queue like this, the
# code should be collapsable? I tried once and it looked too ugly, so we have
# some redundancy.
sub enqueue_for_delete2 {
my ($self, $fidid, $in) = @_;
$in = 0 unless $in;
my $nexttry = $self->unix_timestamp . " + " . int($in);
$self->retry_on_deadlock(sub {
$self->insert_ignore("INTO file_to_delete2 (fid, nexttry) ".
"VALUES (?,$nexttry)", undef, $fidid);
});
}
# enqueue a fidid for work
sub enqueue_for_todo {
my ($self, $fidid, $type, $in) = @_;
$in = 0 unless $in;
my $nexttry = $self->unix_timestamp . " + " . int($in);
$self->retry_on_deadlock(sub {
if (ref($fidid)) {
$self->insert_ignore("INTO file_to_queue (fid, devid, arg, type, ".
"nexttry) VALUES (?,?,?,?,$nexttry)", undef,
$fidid->[0], $fidid->[1], $fidid->[2], $type);
} else {
$self->insert_ignore("INTO file_to_queue (fid, type, nexttry) ".
"VALUES (?,?,$nexttry)", undef, $fidid, $type);
}
});
}
lib/MogileFS/Store.pm view on Meta::CPAN
croak("must supply at least a devid") unless $devid;
my $age = delete $o{age};
my $fidid = delete $o{fidid};
my $limit = delete $o{limit};
croak("invalid options: " . join(', ', keys %o)) if %o;
# If supplied a "previous" fidid, we're paging through.
my $fidsort = '';
my $order = '';
$age ||= 'old';
if ($age eq 'old') {
$fidsort = 'AND fid > ?' if $fidid;
$order = 'ASC';
} elsif ($age eq 'new') {
$fidsort = 'AND fid < ?' if $fidid;
$order = 'DESC';
} else {
croak("invalid age argument: " . $age);
}
$limit ||= 100;
my @extra = ();
push @extra, $fidid if $fidid;
my $fidids = $dbh->selectcol_arrayref("SELECT fid FROM file_on WHERE devid = ? " .
$fidsort . " ORDER BY fid $order LIMIT $limit", undef, $devid, @extra);
return $fidids;
}
# gets fidids above fidid_low up to (and including) fidid_high
sub get_fidids_between {
my ($self, $fidid_low, $fidid_high, $limit) = @_;
$limit ||= 1000;
$limit = int($limit);
my $dbh = $self->dbh;
my $fidids = $dbh->selectcol_arrayref(qq{SELECT fid FROM file
WHERE fid > ? and fid <= ?
ORDER BY fid LIMIT $limit}, undef, $fidid_low, $fidid_high);
return $fidids;
}
# creates a new domain, given a domain namespace string. return the dmid on success,
# throw 'dup' on duplicate name.
# override if you want a less racy version.
sub create_domain {
my ($self, $name) = @_;
my $dbh = $self->dbh;
# get the max domain id
my $maxid = $dbh->selectrow_array('SELECT MAX(dmid) FROM domain') || 0;
my $rv = eval {
$dbh->do('INSERT INTO domain (dmid, namespace) VALUES (?, ?)',
undef, $maxid + 1, $name);
};
if ($self->was_duplicate_error) {
throw("dup");
}
return $maxid+1 if $rv;
die "failed to make domain"; # FIXME: the above is racy.
}
sub update_host {
my ($self, $hid, $to_update) = @_;
my @keys = sort keys %$to_update;
return unless @keys;
$self->conddup(sub {
$self->dbh->do("UPDATE host SET " . join('=?, ', @keys)
. "=? WHERE hostid=?", undef, (map { $to_update->{$_} } @keys),
$hid);
});
return 1;
}
# return ne hostid, or throw 'dup' on error.
# NOTE: you need to put them into the initial 'down' state.
sub create_host {
my ($self, $hostname, $ip) = @_;
my $dbh = $self->dbh;
# racy! lazy. no, better: portable! how often does this happen? :)
my $hid = ($dbh->selectrow_array('SELECT MAX(hostid) FROM host') || 0) + 1;
my $rv = $self->conddup(sub {
$dbh->do("INSERT INTO host (hostid, hostname, hostip, status) ".
"VALUES (?, ?, ?, 'down')",
undef, $hid, $hostname, $ip);
});
return $hid if $rv;
die "db failure";
}
# return array of row hashrefs containing columns: (fid, fromdevid,
# failcount, flags, nexttry)
sub files_to_replicate {
my ($self, $limit) = @_;
my $ut = $self->unix_timestamp;
my $to_repl_map = $self->dbh->selectall_hashref(qq{
SELECT fid, fromdevid, failcount, flags, nexttry
FROM file_to_replicate
WHERE nexttry <= $ut
ORDER BY nexttry
LIMIT $limit
}, "fid") or return ();
return values %$to_repl_map;
}
# "new" style queue consumption code.
# from within a transaction, fetch a limit of fids,
# then update each fid's nexttry to be off in the future,
# giving local workers some time to dequeue the items.
# Note:
# DBI (even with RaiseError) returns weird errors on
# deadlocks from selectall_hashref. So we can't do that.
# we also used to retry on deadlock within the routine,
# but instead lets return undef and let job_master retry.
sub grab_queue_chunk {
my $self = shift;
my $queue = shift;
my $limit = shift;
my $extfields = shift;
my $dbh = $self->dbh;
my $tries = 3;
my $work;
( run in 0.662 second using v1.01-cache-2.11-cpan-39bf76dae61 )