MogileFS-Server

 view release on metacpan or  search on metacpan

lib/MogileFS/Store.pm  view on Meta::CPAN


sub domain_has_files {
    my ($self, $dmid) = @_;
    my $has_a_fid = $self->dbh->selectrow_array('SELECT fid FROM file WHERE dmid = ? LIMIT 1',
                                                undef, $dmid);
    return $has_a_fid ? 1 : 0;
}

sub domain_has_classes {
    my ($self, $dmid) = @_;
    # queryworker does not permit removing default class, so domain_has_classes
    # should not register the default class
    my $has_a_class = $self->dbh->selectrow_array('SELECT classid FROM class WHERE dmid = ? AND classid != 0 LIMIT 1',
        undef, $dmid);
    return defined($has_a_class);
}

sub class_has_files {
    my ($self, $dmid, $clid) = @_;
    my $has_a_fid = $self->dbh->selectrow_array('SELECT fid FROM file WHERE dmid = ? AND classid = ? LIMIT 1',
                                                undef, $dmid, $clid);
    return $has_a_fid ? 1 : 0;
}

# return new classid on success (non-zero integer), die on failure
# throw 'dup' on duplicate name
sub create_class {
    my ($self, $dmid, $classname) = @_;
    my $dbh = $self->dbh;

    my ($clsid, $rv);

    eval {
        $dbh->begin_work;
        if ($classname eq 'default') {
            $clsid = 0;
        } else {
            # get the max class id in this domain
            my $maxid = $dbh->selectrow_array
                ('SELECT MAX(classid) FROM class WHERE dmid = ?', undef, $dmid) || 0;
            $clsid = $maxid + 1;
        }
        # now insert the new class
        $rv = $dbh->do("INSERT INTO class (dmid, classid, classname, mindevcount) VALUES (?, ?, ?, ?)",
                       undef, $dmid, $clsid, $classname, 2);
        $dbh->commit if $rv;
    };
    if ($@ || $dbh->err) {
        if ($self->was_duplicate_error) {
            # ensure we're not inside a transaction
            if ($dbh->{AutoCommit} == 0) { eval { $dbh->rollback }; }
            throw("dup");
        }
    }
    $self->condthrow; # this will rollback on errors
    return $clsid if $rv;
    die;
}

# return 1 on success, throw "dup" on duplicate name error, die otherwise
sub update_class_name {
    my $self = shift;
    my %arg  = $self->_valid_params([qw(dmid classid classname)], @_);
    my $rv = eval {
        $self->dbh->do("UPDATE class SET classname=? WHERE dmid=? AND classid=?",
                       undef, $arg{classname}, $arg{dmid}, $arg{classid});
    };
    throw("dup") if $self->was_duplicate_error;
    $self->condthrow;
    return 1;
}

# return 1 on success, die otherwise
sub update_class_mindevcount {
    my $self = shift;
    my %arg  = $self->_valid_params([qw(dmid classid mindevcount)], @_);
    eval {
    $self->dbh->do("UPDATE class SET mindevcount=? WHERE dmid=? AND classid=?",
                   undef, $arg{mindevcount}, $arg{dmid}, $arg{classid});
    };
    $self->condthrow;
    return 1;
}

# return 1 on success, die otherwise
sub update_class_replpolicy {
    my $self = shift;
    my %arg  = $self->_valid_params([qw(dmid classid replpolicy)], @_);
    eval {
    $self->dbh->do("UPDATE class SET replpolicy=? WHERE dmid=? AND classid=?",
                   undef, $arg{replpolicy}, $arg{dmid}, $arg{classid});
    };
    $self->condthrow;
    return 1;
}

# return 1 on success, die otherwise
sub update_class_hashtype {
    my $self = shift;
    my %arg  = $self->_valid_params([qw(dmid classid hashtype)], @_);
    eval {
    $self->dbh->do("UPDATE class SET hashtype=? WHERE dmid=? AND classid=?",
                   undef, $arg{hashtype}, $arg{dmid}, $arg{classid});
    };
    $self->condthrow;
}

sub nfiles_with_dmid_classid_devcount {
    my ($self, $dmid, $classid, $devcount) = @_;
    return $self->dbh->selectrow_array('SELECT COUNT(*) FROM file WHERE dmid = ? AND classid = ? AND devcount = ?',
                                       undef, $dmid, $classid, $devcount);
}

sub set_server_setting {
    my ($self, $key, $val) = @_;
    my $dbh = $self->dbh;
    die "Your database does not support REPLACE! Reimplement set_server_setting!" unless $self->can_replace;

    eval {
        if (defined $val) {
            $dbh->do("REPLACE INTO server_settings (field, value) VALUES (?, ?)", undef, $key, $val);
        } else {
            $dbh->do("DELETE FROM server_settings WHERE field=?", undef, $key);
        }
    };

    die "Error updating 'server_settings': " . $dbh->errstr if $dbh->err;
    return 1;
}

# FIXME: racy.  currently the only caller doesn't matter, but should be fixed.
sub incr_server_setting {
    my ($self, $key, $val) = @_;
    $val = 1 unless defined $val;
    return unless $val;

    return 1 if $self->dbh->do("UPDATE server_settings ".
                               "SET value=value+? ".
                               "WHERE field=?", undef,
                               $val, $key) > 0;
    $self->set_server_setting($key, $val);
}

sub server_setting {
    my ($self, $key) = @_;
    return $self->dbh->selectrow_array("SELECT value FROM server_settings WHERE field=?",
                                       undef, $key);
}

sub server_settings {
    my ($self) = @_;
    my $ret = {};
    my $sth = $self->dbh->prepare("SELECT field, value FROM server_settings");
    $sth->execute;
    while (my ($k, $v) = $sth->fetchrow_array) {
        $ret->{$k} = $v;
    }
    return $ret;

lib/MogileFS/Store.pm  view on Meta::CPAN


# return hashref of row containing columns "fid, dmid, dkey, length,
# classid, devcount" provided a $fidid or undef if no row.
sub file_row_from_fidid {
    my ($self, $fidid) = @_;
    return $self->dbh->selectrow_hashref("SELECT fid, dmid, dkey, length, classid, devcount ".
                                         "FROM file WHERE fid=?",
                                         undef, $fidid);
}

# return an arrayref of rows containing columns "fid, dmid, dkey, length,
# classid, devcount" provided a pair of $fidid or undef if no rows.
sub file_row_from_fidid_range {
    my ($self, $fromfid, $count) = @_;
    my $sth = $self->dbh->prepare("SELECT fid, dmid, dkey, length, classid, devcount ".
                                  "FROM file WHERE fid > ? LIMIT ?");
    $sth->execute($fromfid,$count);
    return $sth->fetchall_arrayref({});
}

# return array of devids that a fidid is on
sub fid_devids {
    my ($self, $fidid) = @_;
    return @{ $self->dbh->selectcol_arrayref("SELECT devid FROM file_on WHERE fid=?",
                                             undef, $fidid) || [] };
}

# return hashref of { $fidid => [ $devid, $devid... ] } for a bunch of given @fidids
sub fid_devids_multiple {
    my ($self, @fidids) = @_;
    my $in = join(",", map { $_+0 } @fidids);
    my $ret = {};
    my $sth = $self->dbh->prepare("SELECT fid, devid FROM file_on WHERE fid IN ($in)");
    $sth->execute;
    while (my ($fidid, $devid) = $sth->fetchrow_array) {
        push @{$ret->{$fidid} ||= []}, $devid;
    }
    return $ret;
}

# return hashref of columns classid, dmid, dkey, given a $fidid, or return undef
sub tempfile_row_from_fid {
    my ($self, $fidid) = @_;
    return $self->dbh->selectrow_hashref("SELECT classid, dmid, dkey, devids ".
                                         "FROM tempfile WHERE fid=?",
                                         undef, $fidid);
}

# return 1 on success, throw "dup" on duplicate devid or throws other error on failure
sub create_device {
    my ($self, $devid, $hostid, $status) = @_;
    my $rv = $self->conddup(sub {
        $self->dbh->do("INSERT INTO device (devid, hostid, status) VALUES (?,?,?)", undef,
                       $devid, $hostid, $status);
    });
    $self->condthrow;
    die "error making device $devid\n" unless $rv > 0;
    return 1;
}

sub update_device {
    my ($self, $devid, $to_update) = @_;
    my @keys = sort keys %$to_update;
    return unless @keys;
    $self->conddup(sub {
        $self->dbh->do("UPDATE device SET " . join('=?, ', @keys)
            . "=? WHERE devid=?", undef, (map { $to_update->{$_} } @keys),
            $devid);
    });
    return 1;
}

sub update_device_usage {
    my $self = shift;
    my %arg = $self->_valid_params([qw(mb_total mb_used devid mb_asof)], @_);
    eval {
        $self->dbh->do("UPDATE device SET ".
                       "mb_total = ?, mb_used = ?, mb_asof = ?" .
                       " WHERE devid = ?",
                       undef, $arg{mb_total}, $arg{mb_used}, $arg{mb_asof},
                       $arg{devid});
    };
    $self->condthrow;
}

# MySQL has an optimized version
sub update_device_usages {
    my ($self, $updates, $cb) = @_;
    foreach my $upd (@$updates) {
        $self->update_device_usage(%$upd);
        $cb->();
    }
}

# This is unimplemented at the moment as we must verify:
# - no file_on rows exist
# - nothing in file_to_queue is going to attempt to use it
# - nothing in file_to_replicate is going to attempt to use it
# - it's already been marked dead
# - that all trackers are likely to know this :/
# - ensure the devid can't be reused
# IE; the user can't mark it dead then remove it all at once and cause their
# cluster to implode.
sub delete_device {
    die "Unimplemented; needs further testing";
}

sub set_device_weight {
    my ($self, $devid, $weight) = @_;
    eval {
        $self->dbh->do('UPDATE device SET weight = ? WHERE devid = ?', undef, $weight, $devid);
    };
    $self->condthrow;
}

sub set_device_state {
    my ($self, $devid, $state) = @_;
    eval {
        $self->dbh->do('UPDATE device SET status = ? WHERE devid = ?', undef, $state, $devid);
    };
    $self->condthrow;
}

sub delete_class {
    my ($self, $dmid, $cid) = @_;
    throw("has_files") if $self->class_has_files($dmid, $cid);
    eval {
        $self->dbh->do("DELETE FROM class WHERE dmid = ? AND classid = ?", undef, $dmid, $cid);
    };
    $self->condthrow;
}

# called from a queryworker process, will trigger delete_fidid_enqueued
# in the delete worker
sub delete_fidid {
    my ($self, $fidid) = @_;
    eval { $self->dbh->do("DELETE FROM file WHERE fid=?", undef, $fidid); };
    $self->condthrow;
    $self->enqueue_for_delete2($fidid, 0);
    $self->condthrow;
}

# Only called from delete workers (after delete_fidid),
# this reduces client-visible latency from the queryworker
sub delete_fidid_enqueued {
    my ($self, $fidid) = @_;
    eval { $self->delete_checksum($fidid); };

lib/MogileFS/Store.pm  view on Meta::CPAN

    # exist one. Check REPLACE semantics.
    my $rv = $self->dowell($self->ignore_replace . " INTO file_on (fid, devid) VALUES (?,?)",
                           undef, $fidid, $devid);
    return 1 if $rv > 0;
    return 0;
}

# remove a record of fidid existing on devid
# returns 1 on success, 0 if not there anyway
sub remove_fidid_from_devid {
    my ($self, $fidid, $devid) = @_;
    my $rv = eval { $self->dbh->do("DELETE FROM file_on WHERE fid=? AND devid=?",
                            undef, $fidid, $devid); };
    $self->condthrow;
    return $rv;
}

# Test if host exists.
sub get_hostid_by_id {
    my $self = shift;
    my ($hostid) = $self->dbh->selectrow_array('SELECT hostid FROM host WHERE hostid = ?',
        undef, $_[0]);
    return $hostid;
}

sub get_hostid_by_name {
    my $self = shift;
    my ($hostid) = $self->dbh->selectrow_array('SELECT hostid FROM host WHERE hostname = ?',
        undef, $_[0]);
    return $hostid;
}

# get all hosts from database, returns them as list of hashrefs, hashrefs being the row contents.
sub get_all_hosts {
    my ($self) = @_;
    my $sth = $self->dbh->prepare("SELECT /*!40000 SQL_CACHE */ hostid, status, hostname, " .
                                  "hostip, http_port, http_get_port, altip, altmask FROM host");
    $sth->execute;
    my @ret;
    while (my $row = $sth->fetchrow_hashref) {
        push @ret, $row;
    }
    return @ret;
}

# get all devices from database, returns them as list of hashrefs, hashrefs being the row contents.
sub get_all_devices {
    my ($self) = @_;
    my $sth = $self->dbh->prepare("SELECT /*!40000 SQL_CACHE */ devid, hostid, mb_total, " .
                                  "mb_used, mb_asof, status, weight FROM device");
    $self->condthrow;
    $sth->execute;
    my @return;
    while (my $row = $sth->fetchrow_hashref) {
        push @return, $row;
    }
    return @return;
}

# update the device count for a given fidid
sub update_devcount {
    my ($self, $fidid) = @_;
    my $dbh = $self->dbh;
    my $ct = $dbh->selectrow_array("SELECT COUNT(*) FROM file_on WHERE fid=?",
                                   undef, $fidid);

    eval { $dbh->do("UPDATE file SET devcount=? WHERE fid=?", undef,
              $ct, $fidid); };
    $self->condthrow;

    return 1;
}

# update the classid for a given fidid
sub update_classid {
    my ($self, $fidid, $classid) = @_;
    my $dbh = $self->dbh;

    $dbh->do("UPDATE file SET classid=? WHERE fid=?", undef,
              $classid, $fidid);

    $self->condthrow;
    return 1;
}

# enqueue a fidid for replication, from a specific deviceid (can be undef), in a given number of seconds.
sub enqueue_for_replication {
    my ($self, $fidid, $from_devid, $in) = @_;

    my $nexttry = 0;
    if ($in) {
        $nexttry = $self->unix_timestamp . " + " . int($in);
    }

    $self->retry_on_deadlock(sub {
        $self->insert_ignore("INTO file_to_replicate (fid, fromdevid, nexttry) ".
                             "VALUES (?,?,$nexttry)", undef, $fidid, $from_devid);
    });
}

# enqueue a fidid for delete
# note: if we get one more "independent" queue like this, the
# code should be collapsable? I tried once and it looked too ugly, so we have
# some redundancy.
sub enqueue_for_delete2 {
    my ($self, $fidid, $in) = @_;

    $in = 0 unless $in;
    my $nexttry = $self->unix_timestamp . " + " . int($in);

    $self->retry_on_deadlock(sub {
        $self->insert_ignore("INTO file_to_delete2 (fid, nexttry) ".
                             "VALUES (?,$nexttry)", undef, $fidid);
    });
}

# enqueue a fidid for work
sub enqueue_for_todo {
    my ($self, $fidid, $type, $in) = @_;

    $in = 0 unless $in;
    my $nexttry = $self->unix_timestamp . " + " . int($in);

    $self->retry_on_deadlock(sub {
        if (ref($fidid)) {
            $self->insert_ignore("INTO file_to_queue (fid, devid, arg, type, ".
                                 "nexttry) VALUES (?,?,?,?,$nexttry)", undef,
                                 $fidid->[0], $fidid->[1], $fidid->[2], $type);
        } else {
            $self->insert_ignore("INTO file_to_queue (fid, type, nexttry) ".
                                 "VALUES (?,?,$nexttry)", undef, $fidid, $type);
        }
    });
}

lib/MogileFS/Store.pm  view on Meta::CPAN

    croak("must supply at least a devid") unless $devid;
    my $age   = delete $o{age};
    my $fidid = delete $o{fidid};
    my $limit = delete $o{limit};
    croak("invalid options: " . join(', ', keys %o)) if %o;
    # If supplied a "previous" fidid, we're paging through.
    my $fidsort = '';
    my $order   = '';
    $age ||= 'old';
    if ($age eq 'old') {
        $fidsort = 'AND fid > ?' if $fidid;
        $order   = 'ASC';
    } elsif ($age eq 'new') {
        $fidsort = 'AND fid < ?' if $fidid;
        $order   = 'DESC';
    } else {
        croak("invalid age argument: " . $age);
    }
    $limit ||= 100;
    my @extra = ();
    push @extra, $fidid if $fidid;

    my $fidids = $dbh->selectcol_arrayref("SELECT fid FROM file_on WHERE devid = ? " .
        $fidsort . " ORDER BY fid $order LIMIT $limit", undef, $devid, @extra);
    return $fidids;
}

# gets fidids above fidid_low up to (and including) fidid_high
sub get_fidids_between {
    my ($self, $fidid_low, $fidid_high, $limit) = @_;
    $limit ||= 1000;
    $limit = int($limit);

    my $dbh = $self->dbh;
    my $fidids = $dbh->selectcol_arrayref(qq{SELECT fid FROM file
        WHERE fid > ? and fid <= ?
        ORDER BY fid LIMIT $limit}, undef, $fidid_low, $fidid_high);
    return $fidids;
}

# creates a new domain, given a domain namespace string.  return the dmid on success,
# throw 'dup' on duplicate name.
# override if you want a less racy version.
sub create_domain {
    my ($self, $name) = @_;
    my $dbh = $self->dbh;

    # get the max domain id
    my $maxid = $dbh->selectrow_array('SELECT MAX(dmid) FROM domain') || 0;
    my $rv = eval {
        $dbh->do('INSERT INTO domain (dmid, namespace) VALUES (?, ?)',
                 undef, $maxid + 1, $name);
    };
    if ($self->was_duplicate_error) {
        throw("dup");
    }
    return $maxid+1 if $rv;
    die "failed to make domain";  # FIXME: the above is racy.
}

sub update_host {
    my ($self, $hid, $to_update) = @_;
    my @keys = sort keys %$to_update;
    return unless @keys;
    $self->conddup(sub {
        $self->dbh->do("UPDATE host SET " . join('=?, ', @keys)
            . "=? WHERE hostid=?", undef, (map { $to_update->{$_} } @keys),
            $hid);
    });
    return 1;
}

# return ne hostid, or throw 'dup' on error.
# NOTE: you need to put them into the initial 'down' state.
sub create_host {
    my ($self, $hostname, $ip) = @_;
    my $dbh = $self->dbh;
    # racy! lazy. no, better: portable! how often does this happen? :)
    my $hid = ($dbh->selectrow_array('SELECT MAX(hostid) FROM host') || 0) + 1;
    my $rv = $self->conddup(sub {
        $dbh->do("INSERT INTO host (hostid, hostname, hostip, status) ".
                 "VALUES (?, ?, ?, 'down')",
                 undef, $hid, $hostname, $ip);
    });
    return $hid if $rv;
    die "db failure";
}

# return array of row hashrefs containing columns: (fid, fromdevid,
# failcount, flags, nexttry)
sub files_to_replicate {
    my ($self, $limit) = @_;
    my $ut = $self->unix_timestamp;
    my $to_repl_map = $self->dbh->selectall_hashref(qq{
        SELECT fid, fromdevid, failcount, flags, nexttry
        FROM file_to_replicate
        WHERE nexttry <= $ut
        ORDER BY nexttry
        LIMIT $limit
    }, "fid") or return ();
    return values %$to_repl_map;
}

# "new" style queue consumption code.
# from within a transaction, fetch a limit of fids,
# then update each fid's nexttry to be off in the future,
# giving local workers some time to dequeue the items.
# Note:
# DBI (even with RaiseError) returns weird errors on
# deadlocks from selectall_hashref. So we can't do that.
# we also used to retry on deadlock within the routine,
# but instead lets return undef and let job_master retry.
sub grab_queue_chunk {
    my $self      = shift;
    my $queue     = shift;
    my $limit     = shift;
    my $extfields = shift;

    my $dbh = $self->dbh;
    my $tries = 3;
    my $work;



( run in 0.662 second using v1.01-cache-2.11-cpan-39bf76dae61 )