AnyEvent-DBI

 view release on metacpan or  search on metacpan

t/fake-mysql  view on Meta::CPAN

C<REMOTE_SELECT> query is executed on the remote server and only the result is
sent back to the default server for further processing. This is useful if a lot
of processing is to be done on the remote server -- the Federated engine will
bring most of the data to the connecting server and will process it there,
which can potentially be very time consuming.

Please note that since a temporary table is created and it must reside
somewhere, you need to be in a working database on the default
server. Updateable C<VIEW>a are not supported.

=head2 Development support - devel.conf

This configuration provides the following operators:

C<shell> - can be used to execute shell commands, e.g. C<shell ls -la>.

C<env> - returns the operating environment of C<myserver.pl>.

C<stats> - executes C<SHOW STATUS> before and after each query and returns the
difference. First you execute

C<stats select a from b> and then C<show stats>.

C<devel> - can be used to send specfic queries to a different server. You can
execute a single query as

C<devel select a from b> or use a standalone C<devel> to redirect all future
queries until you issue C<restore>.

You specify the server to send "development" queries to via C<set('devel_dsn')>
at the top of C<devel.conf>

=head2 ODBC compatibility - odbc.conf

The C<odbc.conf> contains an example on how to unintelligently answer generic
queries sent by the MySQL ODBC driver and the applications that use it, up to
the point where real data can be sent over the connection and imported into the
client application.

=cut

use strict;
use Socket;
use DBI;
use DBIx::MyServer;
use DBIx::MyServer::DBI;
use Getopt::Long qw(:config pass_through);

$SIG{CHLD} = 'IGNORE';

my $start_dsn;
my $start_dsn_user;
my $start_dsn_password;

my $remote_dsn;
my $remote_dsn_user;
my $remote_dsn_password;

my $port = '23306';
my $interface = '127.0.0.1';
my $debug;
my @config_names;
my @rules;
my %storage;

my @args = @ARGV;

my $result = GetOptions(
	"dsn=s"			=> \$start_dsn,
	"dsn_user=s"		=> \$start_dsn_user,
	"dsn_password=s"	=> \$start_dsn_password,
	"remote_dsn=s"		=> \$remote_dsn,
	"remote_dsn_user=s"	=> \$remote_dsn_user,
	"remote_dsn_password=s"	=> \$remote_dsn_password,
	"port=i"		=> \$port,
	"config=s"		=> \@config_names,
	"if|interface|ip=s"	=> \$interface,
	"debug"			=> \$debug
) or die;

@ARGV = @args;

my $start_dbh;
if (defined $start_dsn) {
	print localtime()." [$$] Connecting to DSN $start_dsn.\n" if $debug;
	$start_dbh = DBI->connect($start_dsn, $start_dsn_user, $start_dsn_password);
}

$storage{dbh} = $start_dbh;
$storage{dsn} = $start_dsn;
$storage{dsn_user} = $start_dsn_user;
$storage{dsn_password} = $start_dsn_password;

$storage{remote_dsn} = $remote_dsn;
$storage{remote_dsn_user} = $remote_dsn_user;
$storage{remote_dsn_password} = $remote_dsn_password;

foreach my $config_name (@config_names) {
	my $config_sub;
	open (CONFIG_FILE, $config_name) or die "unable to open $config_name: $!";
	read (CONFIG_FILE, my $config_text, -s $config_name);
	close (CONFIG_FILE);
	eval ('$config_sub = sub { '.$config_text.'}') or die $@;
	my @config_rules = &$config_sub();
	push @rules, @config_rules;
	print localtime()." [$$] Loaded ".($#config_rules + 1)." rules from $config_name.\n" if $debug;
}

socket(SERVER_SOCK, PF_INET, SOCK_STREAM, getprotobyname('tcp'));
setsockopt(SERVER_SOCK, SOL_SOCKET, SO_REUSEADDR, pack("l", 1));
bind(SERVER_SOCK, sockaddr_in($port, inet_aton($interface))) || die "bind: $!";
listen(SERVER_SOCK,1);

print localtime()." [$$] Note: port $port is now open on interface $interface.\n" if $debug;
while (1) {
	my $remote_paddr = accept(my $remote_socket, SERVER_SOCK);

	if (!defined(my $pid = fork)) {
		die "cannot fork: $!";
	} elsif ($pid) {
		next;
	}

	$storage{dbh} = $start_dbh->clone() if defined $start_dbh;
	$storage{dsn} = $start_dsn;
	$storage{args}= \@ARGV;
	
	my $dbh = get('dbh');
	my $myserver = DBIx::MyServer::DBI->new(
		socket => $remote_socket,
		dbh => $dbh,
		banner => $0.' '.join(' ', @ARGV)
	);
	set('myserver', $myserver);

	$myserver->sendServerHello();
	my ($username, $database) = $myserver->readClientHello();
	set('username', $username); set('database', $database);

        eval {
		my $hersockaddr = getpeername($myserver->getSocket());
		my ($port, $iaddr) = sockaddr_in($hersockaddr);
		my $remote_host = inet_ntoa($iaddr);
		set('remote_host', $remote_host);
        };
	
	$myserver->sendOK();

	while (1) {
		my ($command, $query) = $myserver->readCommand();
		print localtime()." [$$] command: $command; data = $query\n" if $debug;
		last if (not defined $command) || ($command == DBIx::MyServer::COM_QUIT);

		my $outgoing_query = $query;

		foreach my $i (0..$#rules) {

			my $rule = $rules[$i];
			my $rule_matches = 0;

			my @placeholders;

			if (defined $rule->{command}) {
				if ($command == $rule->{command}) {
					$rule_matches = 1;
				} else {
					next;
				}
			} 

			my $match_type = ref($rule->{match});
			if (defined $rule->{match}) {
				$rule->{match_string} = $match_type eq 'CODE' ? $rule->{match}($query) : $rule->{match};
				if (ref($rule->{match_string}) eq 'Regexp') {
					$rule_matches = 1 if @placeholders = $query =~ $rule->{match};
				} else {
					$rule_matches = 1 if $query eq $rule->{match_string};
				}
				print localtime()." [$$] Executing 'match' from rule $i: $rule->{match_string}, result is $rule_matches.\n" if $debug;
			} else {
				$rule_matches = 1;
			}
			$rule->{placeholders} = \@placeholders;

			next if $rule_matches == 0;

			my ($definitions, $data);

			undef $storage{data_sent};

			if (defined $rule->{before}) {
				print localtime()." [$$] Executing 'before' from rule $i\n" if $debug;
				eval{
					$rule->{before}($query, @{$rule->{placeholders}});
				};
				error($@) if defined $@ && $@ ne '';
			}

			if (defined $rule->{rewrite}) {
				if (ref($rule->{rewrite}) eq 'CODE') {
					$outgoing_query = $rule->{rewrite}($query, @{$rule->{placeholders}});
				} else {
					$outgoing_query = $rule->{rewrite};
				}
				print localtime()." [$$] Executing 'rewrite' from rule $i, result is '$outgoing_query'\n" if $debug;
			} elsif (defined $rule->{match}) {
				$outgoing_query = $rule->{match_string} eq 'Regexp' ? $rule->{placeholders}->[0] : $outgoing_query;
			}

			if (defined $rule->{error}) {
				my @error = ref ($rule->{error}) eq 'CODE' ? $rule->{error}($query, @{$rule->{placeholders}}) : $rule->{error};
				my @mid_error = ref($error[0]) eq 'ARRAY' ? @{$error[0]} : @error;
				if (defined $mid_error[0]) {
					print localtime()." [$$] Sending error: ".join(', ', @mid_error).".\n" if $debug;
					error(@mid_error);
				}
			}

			if (defined $rule->{ok}) {
				my @ok = ref ($rule->{ok}) eq 'CODE' ? $rule->{ok}($query, @{$rule->{placeholders}}) : $rule->{ok};
				my @mid_ok = ref($ok[0]) eq 'ARRAY' ? @{$ok[0]} : @ok;
				if (defined $mid_ok[0]) {
					print localtime()." [$$] Sending OK: ".join(', ', @mid_ok).").\n" if $debug;
					ok(@mid_ok);
				}
			}

			if (defined $rule->{columns}) {
				my @column_names = ref($rule->{columns}) eq 'CODE' ? $rule->{columns}($query, @{$rule->{placeholders}}) : $rule->{columns};
				my $column_names;
				if (defined $column_names[1]) {
					$column_names = \@column_names;
				} elsif (ref($column_names[0]) eq 'ARRAY') {
					$column_names = $column_names[0];
				} elsif (defined $column_names[0]) {
					$column_names = [ $column_names[0] ];
				}
				print localtime()." [$$] Converting column_names into definitions.\n" if $debug;
				$definitions = [ map { $myserver->newDefinition( name => $_ ) } @$column_names ];
			}

			if (defined $rule->{data}) {
				my @start_data = ref($rule->{data}) eq 'CODE' ? $rule->{data}($query, @{$rule->{placeholders}}) : $rule->{data};
				my $mid_data = defined $start_data[1] ? \@start_data : $start_data[0];

				if (ref($mid_data) eq 'HASH') {
					print localtime()." [$$] Converting data from hash.\n" if $debug;
					$data = [ map { [ $_, $mid_data->{$_} ] } sort keys %$mid_data ];
				} elsif ((ref($mid_data) eq 'ARRAY') && (ref($mid_data->[0]) ne 'ARRAY')) {
					print localtime()." [$$] Converting data from a flat array.\n" if $debug;
					$data = [ map { [ $_ ] } @$mid_data ];
				} elsif (ref($mid_data) eq '') {
					$data = [ [ $mid_data ] ];
				} else {
					$data = $mid_data;
				}
			}

			if (
				(not defined $storage{data_sent}) && (not defined $definitions) && (not defined $data) &&
				( ($i == $#rules) || (defined $rule->{dbh}) || (defined $rule->{forward}) )
			) {
				if (defined $rule->{dbh}) {
					$myserver->setDbh($rule->{dbh});
				} elsif (defined $rule->{dsn}) {
					if (ref($rule->{dsn}) eq 'ARRAY') {
						print localtime()." [$$] Connecting to DSN $rule->{dsn}->[0].\n" if $debug;
						$myserver->setDbh(DBI->connect(@{$rule->{dsn}}));
					} else {
						print localtime()." [$$] Connecting to DSN $rule->{dsn}.\n" if $debug;
						$myserver->setDbh(DBI->connect($rule->{dsn}, get('dsn_user'), get('dsn_password')));
					}
				}
				if (not defined get('dbh')) {
					error("No --dbh specified. Can not forward query.",1235, 42000);
				} elsif ($command == DBIx::MyServer::COM_QUERY) {
					(my $foo, $definitions, $data) = $myserver->comQuery($outgoing_query);
				} elsif ($command == DBIx::MyServer::COM_INIT_DB) {
					(my $foo, $definitions, $data) = $myserver->comInitDb($outgoing_query);
				} else {
					error("Don't know how to handle command $command.",1235, 42000);
				}
				$storage{data_sent} = 1;
			}

			if (defined $definitions) {
				print localtime()." [$$] Sending definitions.\n" if $debug;
				$myserver->sendDefinitions($definitions);
				$storage{data_sent} = 1;
			}

			if (defined $data) {
				print localtime()." [$$] Sending data.\n" if $debug;
				$myserver->sendRows($data);
				$storage{data_sent} = 1;
			}

			if (defined $rule->{after}) {
				print localtime()." [$$] Executing 'after' for rule $i\n" if $debug;
				$rule->{after}($query, @{$rule->{placeholders}})
			}

			last if defined $storage{data_sent};
		}

	}

	print localtime()." [$$] Exit.\n" if $debug;
	exit;
}

sub set {
	my ($name, $value) = @_;
	$storage{$name} = $value;
	if ($name eq 'dsn') {
		if (defined $value) {
			my $dbh;
			if (ref($value) eq 'ARRAY') {
				print localtime()." [$$] Connecting to DSN $value->[0].\n" if $debug;
				$dbh = DBI->connect(@{$value});
			} else {
				print localtime()." [$$] Connecting to DSN $value.\n" if $debug;
				$dbh = DBI->connect($value, get('dsn_user'), get('dsn_password'));
			}
			$storage{myserver}->setDbh($dbh);
			$storage{dbh} = $dbh;
		} else {
			$storage{myserver}->setDbh(undef);
			$storage{dbh} = undef;
		}
	}
	return 1;
}

sub error {
	my $myserver = get('myserver');
	$myserver->sendError(@_);
	$storage{data_sent} = 1;
}

sub error_dbi {
	my $myserver = get ('myserver');
	my $dbh = $_[0] || get ('dbh');
	$myserver->sendErrorFromDBI($dbh);
	$storage{data_sent} = 1;
}

sub ok {
	my $myserver = get('myserver');
	if ($_[0] == 1) {
		$myserver->sendOK();
	} else {
		$myserver->sendOK(@_);
	}
	$storage{data_sent} = 1;
}

sub disconnect { exit; }

sub get {
	return $storage{$_[0]};
}



( run in 1.454 second using v1.01-cache-2.11-cpan-437f7b0c052 )