DBIx-Class-Async

 view release on metacpan or  search on metacpan

lib/DBIx/Class/Async/ResultSet.pm  view on Meta::CPAN

    # 3. Clear any existing inflated rows and reset position
    # This ensures that if the cache is updated, the inflation happens again
    $self->{_rows} = undef;
    $self->{_pos}  = 0;

    return $self;
}

=head2 update

    # Bulk update all rows in the current ResultSet
    $rs->search({ status => 'pending' })->update({ status => 'processing' });

    # Single targeted update ignoring current RS filters
    $rs->update({ id => 42 }, { status => 'archived' });

Performs an asynchronous C<UPDATE> operation on the database.

=over 4

=item * Set-Based Operation

Unlike a row-level update, this method acts on the entire scope of the
ResultSet in a single database round-trip.

=item * Cache Invalidation

Automatically calls C<clear_cache> on the ResultSet. This prevents the
parent process from serving stale data after the update has completed.

=item * Deflation Support

Automatically detects columns that require custom serialisation (e.g.,
JSON to string, DateTime to ISO string) by consulting the C<_custom_inflators> registry.

=item * Flexible Signature

=over 4

=item * C<update(\%updates)>: Uses the ResultSet's existing C<where> clause.

=item * C<update(\%cond, \%updates)>: Overrides the current filters with the provided C<%cond>.

=back

=back

Example: Atomic Batch Update with Deflation

    # Assuming 'metadata' is a column that deflates to JSON
    my $future = $rs->search({ active => 1 })
                    ->update({
                        last_updated => \'NOW()',
                        metadata     => { version => '2.0', source => 'api' }
                    });

    $future->on_done(sub { say "Batch update complete." });

=cut

sub update {
    my $self = shift;
    my ($cond, $updates);

    # Logic to handle both:
    #   ->update({ col => val })
    #   ->update({ id => 1 }, { col => val })
    if (@_ > 1) {
        ($cond, $updates) = @_;
    } else {
        $updates = shift;
        $cond    = $self->{_cond};
    }

    my @pk_cols = $self->result_source->primary_columns;
    my $cache_key;

    if (ref($cond) eq 'HASH' && @pk_cols) {
        my %pk_cond = map {
            $_ => $cond->{$_}
        } grep {
            exists $cond->{$_}
        } @pk_cols;

        if (%pk_cond) {
            $cache_key = $self->_generate_cache_key(0, \%pk_cond);
        }
    }

    $cache_key ||= $self->_generate_cache_key(0, $cond);

    my $db = $self->{_async_db};
    my $inflators = $db->{_custom_inflators}{ $self->{_source_name} } || {};

    # Ensure nested Hashes are turned back into Strings for the database
    foreach my $col (keys %$updates) {
        if ($inflators->{$col} && $inflators->{$col}{deflate}) {
            $updates->{$col} = $inflators->{$col}{deflate}->($updates->{$col});
        }
    }

    return DBIx::Class::Async::_call_worker(
        $db,
        'update',
        {
            source_name => $self->{_source_name},
            cond        => $cond,
            updates     => $updates,
        }
    );
}

=head2 update_all

    my $future = $rs->search({ type => 'temporary' })->update_all({ type => 'permanent' });

Performs a two-step asynchronous update. First, it retrieves all rows matching
the current criteria to identify their Primary Keys, then it issues a
bulk update targeting those specific IDs.

=over 4

=item * Precision

By fetching the IDs first, this method ensures that triggers or logic dependent
on the primary key are correctly handled.

=item * Safety

If no rows match the initial search, the method short-circuits and returns a
resolved Future with a value of C<0>, avoiding an unnecessary database trip
for the update phase.

=item * Traceability

Supports C<ASYNC_TRACE> logging to help debug empty sets or unexpected data
types during the fetch phase.

=item * Atomicity Note

Unlike C<update>, this involves two distinct database interactions. If the
data changes between the fetch and the update phase, only the rows identified
in the first phase will be updated.

=back

Example: Safe Batch Update

    $rs->update_all({ last_processed => \'NOW()' })->on_done(sub {
        my $count = shift;
        print "Successfully updated $count specific records.\n";
    });

=cut

sub update_all {
    my ($self, $updates) = @_;
    my $bridge = $self->{_async_db};

    return $self->all->then(sub {
        my $rows = shift;

        # Hard check: is it really an arrayref?
        unless ($rows && ref($rows) eq 'ARRAY' && @$rows) {
            warn "[PID $$] update_all found no rows to update or invalid data type"
                if ASYNC_TRACE;
            return Future->done(0);
        }

        my ($pk) = $self->result_source->primary_columns;
        my @ids  = map { $_->get_column($pk) } @$rows;

         my $payload = {
            source_name => $self->{_source_name},
            cond        => { $pk => { -in => \@ids } },
            updates     => $updates,
        };

        return DBIx::Class::Async::_call_worker(
            $bridge,
            'update',
            $payload)->then(sub {
            my $affected = shift;
            return Future->done($affected);
        });
    });
}

=head2 update_or_new

    my $future = $rs->update_or_new({
        email => 'dev@example.com',
        name  => 'Gemini'
    });

Attempts to locate a record using its unique constraints or primary key.

=over 4

=item * Action on Success

If the record is found, it immediately triggers an asynchronous C<update>
with the provided data and returns a L<Future> resolving to the updated Row object.

=item * Action on Failure

If no record is found, it creates a new B<in-memory> row object. This object
is B<not> yet saved to the database (C<in_storage> will be false).

=item * Data Sanitisation

Automatically strips table aliases (C<me.>, C<foreign.>) from the data keys
to ensure the Row object constructor receives clean column names.

=item * Consistency

This method always returns a B<Future>, regardless of whether it performed a
database update or a local object instantiation.

=back

Example: Syncing User Profiles

    $rs->update_or_new({
        external_id => $id,
        last_login  => \'NOW()'
    })->then(sub {
        my $user = shift;
        if ($user->in_storage) {
            say "Updated existing user: " . $user->id;
        } else {
            say "Prepared new user for registration.";
            # You must call ->insert on the new object to persist it
            return $user->insert;
        }
    });

=cut

sub update_or_new {
    my ($self, $data, $attrs) = @_;
    $attrs //= {};

    # Identify the primary key or unique constraint values for the lookup
    my $lookup = $self->_extract_unique_lookup($data, $attrs);

    return $self->find($lookup, $attrs)->then(sub {
        my ($row) = @_;

        if ($row) {
            # Object found in DB: trigger an async UPDATE
            return $row->update($data);
        }

        # Object NOT found: merge condition and data for a local 'new' object
        my %new_data = ( %{$self->{_cond} || {}}, %$data );
        my %clean_data;
        while (my ($k, $v) = each %new_data) {
            # Strip DBIC aliases so they don't crash the Row constructor
            (my $clean_key = $k) =~ s/^(?:me|foreign|self)\.//;
            $clean_data{$clean_key} = $v;
        }

        # Returns a Future wrapping the local Row object (in_storage = 0)
        return Future->done($self->new_result(\%clean_data));
    });
}

=head2 update_or_create

    my $future = $rs->update_or_create({
        username => 'coder123',
        last_seen => \'NOW()'
    });

Attempts to find a record by its unique constraints. If found, it updates it.
If not, it creates a new record in the database.

=over 4

=item * Atomic Strategy

This method manages the "Check-then-Act" pattern safely across asynchronous workers.

=item * Race Condition Recovery

In highly concurrent systems, a record might be inserted by another process
between this method's C<find> and C<create> steps. This method detects that
specific database conflict (Unique Constraint Violation) and automatically
recovers by re-finding the newly created record and updating it instead.

=item * Error Handling

While it handles "Already Exists" conflicts gracefully, other database errors
(like type mismatches or connection issues) will still trigger a C<fail>
state in the returned Future.

=back

Example: Distributed Token Sync

    # Multiple workers might run this for the same 'service_key'
    $schema->resultset('AuthToken')->update_or_create({
        service_key => 'worker_node_1',
        token       => $new_secure_token
    })->on_done(sub {
        my $row = shift;
        say "Token synced for ID: " . $row->id;
    })->on_fail(sub {
        die "Sync failed: " . shift;
    });

=cut

sub update_or_create {
    my ($self, $data, $attrs) = @_;
    $attrs //= {};

    my $lookup = $self->_extract_unique_lookup($data, $attrs);

    return $self->find($lookup, $attrs)->then(sub {
        my ($row) = @_;

        if ($row) {
            # 1. Standard Update Path
            return $row->update($data);
        }

        # 2. Not Found: Attempt Create
        return $self->create($data)->catch(sub {
            my ($error, $type) = @_;

            # If it's a DB unique constraint error, someone else beat us to the insert
            if ($type eq 'db_error' && "$error" =~ /unique constraint|already exists/i) {

                # 3. Race Recovery: Re-find the winner and update them
                return $self->find($lookup, $attrs)->then(sub {
                    my ($recovered) = @_;
                    return $recovered
                        ? $recovered->update($data)
                        : Future->fail("Race recovery failed: record vanished after conflict", "logic_error");
                });
            }

            # Otherwise, bubble up the original error
            return Future->fail($error, $type);
        });
    });
}

=head2 update_query

    my ($sql, @bind) = $rs->update_query(\%updates);
    my $query = $rs->update_query(\%updates);

Returns the SQL and bind values that would be generated by an update operation
without actually executing it. This is useful for debugging, logging, auditing,
or testing.

Returns the same structure as L</as_query>: a reference to an array containing
the SQL (as a scalar reference) and bind values.

    my $rs = $schema->resultset('User')->search({ active => 0 });
    my ($sql, @bind) = $rs->update_query({ active => 1 });
    # $sql = \'UPDATE users SET active = ? WHERE active = ?'
    # @bind = (1, 0)

    # Audit logging
    warn "About to execute: $$sql with binds: @bind";
    $rs->update({ active => 1 })->get;  # Now execute

=cut

sub update_query {
    my ($self, $values) = @_;

    my $bridge       = $self->{_async_db};
    my $schema_class = $bridge->{_schema_class};

    unless ($schema_class->can('resultset')) {
        eval "require $schema_class" or die "update_query: $@";
    }

    # Silence warnings
    local $SIG{__WARN__} = sub {
        if (ASYNC_TRACE) {
            warn @_ unless $_[0] =~ /undetermined_driver|sql_limit_dialect|GenericSubQ/
        }
    };

    unless ($bridge->{_metadata_schema}) {
        $bridge->{_metadata_schema} = $schema_class->connect('dbi:NullP:');
    }

    # Create a sync ResultSet with the same conditions
    my $real_rs = $bridge->{_metadata_schema}
                         ->resultset($self->{_source_name})
                         ->search($self->{_cond}, $self->{_attrs});

    # Generate SQL using DBIC's internal update SQL generator
    my $storage = $bridge->{_metadata_schema}->storage;

    # Get the update SQL - use DBIC's internal methods
    my ($sql, @bind);

    # DBIC stores the SQL generation in storage->update
    # We need to use the same path as actual execution but capture the SQL
    eval {
        # Get source and create update statement
        my $source = $real_rs->result_source;
        my $cond   = $real_rs->{cond};
        my $attrs  = $real_rs->{attrs};

        # Build WHERE clause from search conditions
        my ($where_sql, @where_bind) = $storage->sql_maker->where($cond);

        # Build SET clause from update values
        my (@set_parts, @set_bind);
        for my $col (sort keys %$values) {
            push @set_parts, "$col = ?";
            push @set_bind, $values->{$col};
        }
        my $set_sql = join(', ', @set_parts);

        # Construct full UPDATE statement
        my $table = $source->from;
        $sql  = \"UPDATE $table SET $set_sql$where_sql";
        @bind = (@set_bind, @where_bind);
    };

    if ($@) {
        croak("Failed to generate update SQL: $@");
    }



( run in 0.477 second using v1.01-cache-2.11-cpan-39bf76dae61 )