Net-Wire10

 view release on metacpan or  search on metacpan

lib/Net/Wire10.pm  view on Meta::CPAN

my %COLUMN_FLAGS = %{*COLUMN_FLAGS};

# Constructor
sub new {
	my $class = shift;
	my %args = @_;

	my $self = bless {
		host             => $args{host},
		port             => $args{port} || DEFAULT_PORT_NUMBER,
		database         => $args{database},
		user             => $args{user},
		password         => $args{password},
		connect_timeout  => defined($args{connect_timeout}) ? $args{connect_timeout} : DEFAULT_CONNECT_TIMEOUT,
		query_timeout    => defined($args{query_timeout}) ? $args{query_timeout} : DEFAULT_QUERY_TIMEOUT,
		flags            => $args{flags} || 0,
		debug            => $args{debug} || 0,
	}, $class;
	$self->_reset_connection_state;
	$self->_reset_command_state;
	return $self;
}

# Initializes the connection to the server.
# An error is raised by an inner class if already connected
sub connect {
	my $self = shift;
	$self->_connect;
	$self->_perform_handshake;
	$self->_perform_authentication;
}

# Sends an SQL query
sub query {
	my $self = shift;
	my $sql = shift;

	return $self->_execute_query($sql, 0);
}

# Sends an SQL query, but does not read any rows in the result set
sub stream {
	my $self = shift;
	my $sql = shift;

	return $self->_execute_query($sql, 1);
}

# Creates a prepared statement object.
sub prepare {
	my $self = shift;
	my $sql = shift;

	return Net::Wire10::PreparedStatement->new($self, $sql);
}

# Sends a wire protocol ping
sub ping {
	my $self = shift;

	$self->_check_streaming;
	$self->_check_connected;
	$self->_reset_command_state;
	$self->_reset_timeout($self->{connect_timeout});

	return $self->_execute_command(COMMAND_PING, '', undef);
}

# Close the database connection
sub disconnect {
	my $self = shift;
	my $socket = $self->{socket};
	my $select = $self->{io_select};

	eval {
		if ($socket) {
			if ($select) {
				if ($select->can_write(TIMEOUT_GRANULARITY)) {
					my $body = COMMAND_QUIT;
					$self->_send_mackets($body, 0, MACKET_COMMAND);
				}
			}
			$socket->close;
		}
	};
	warn $@ if $@;

	$self->_reset_command_state;
	$self->_reset_connection_state;

	return undef;
}

# Cancels a running query
sub cancel {
	my $self = shift;
	$self->{cancelling} = 1;
	return undef;
}

# Get the connection id
sub get_connection_id {
	my $self = shift;
	return $self->{server_thread_id};
}

# Get the server version string
sub get_server_version {
	my $self = shift;
	return $self->{server_version};
}

# Is the driver currently connected?
# If a fatal error has occurred, this will return false
sub is_connected {
	my $self = shift;
	return defined($self->{socket});
}

# Return the current error object, if any
sub get_error_info {
	my $self = shift;
	return $self->{error};
}

# Reset the time remaining counter before executing a command
sub _reset_timeout {
	my $self = shift;
	my $seconds = shift;
	if ($seconds == 0) {
		$self->{command_expire_time} = 0;
		return undef;
	}
	$self->{command_expire_time} = time + $seconds;
	return undef;
}

# Return the number of seconds left before the current
# operation should time out
sub _check_time_remaining {
	my $self = shift;
	return 0 if $self->{command_expire_time} == 0;
	my $remaining = $self->{command_expire_time} - time;
	$self->_fatal_error("Timeout while receiving data") if $remaining < 1;
	return $remaining;
}

# Fail if not connected anymore, due for example to a fatal error
sub _check_connected {
	my $self = shift;
	$self->_fatal_error("Not connected") unless defined($self->{socket});
}

# Fail if currently connected to a streaming data reader
sub _check_streaming {
	my $self = shift;
	$self->_vanilla_error("Connection is busy streaming") if $self->{streaming};
}

# Connects to the database server
sub _connect {
	my $self = shift;

	$self->_vanilla_error("Already connected") if defined($self->{socket});
	$self->_fatal_error("No host given") if length($self->{host}) == 0;
	$self->_fatal_error("No port given") if length($self->{port}) == 0;

	# Connect timeout.
	$self->_reset_timeout($self->{connect_timeout});

	my $socket;
	printf "Connecting to: %s:%d/tcp\n", $self->{host}, $self->{port} if $self->{debug} & 1;
	$socket = IO::Socket::INET->new(
		Proto    => 'tcp',
		PeerAddr => $self->{host},
		PeerPort => $self->{port},
		Timeout  => $self->_check_time_remaining
	) or $self->_fatal_error("Couldn't connect to $self->{host}:$self->{port}/tcp: $@");
	$socket->autoflush(1);
	$socket->timeout(TIMEOUT_GRANULARITY);
	$self->{socket} = $socket;
	$self->{io_select} = new IO::Select($self->{socket});

	$self->_reset_command_state;
}

# When a fatal error occurs, tear down TCP
# connection and set command state to indicate error.
sub _fatal_error {
	my $self = shift;
	my $msg = shift || '';

	$self->disconnect;

	$self->{error} = Net::Wire10::Error->new(-1, '', $msg) unless defined ($self->{error});
	$self->{error}->{message} = $msg if length($msg) > 0;

	die $self->{error}->{message};
}

# When a non-fatal error occurs, just throw it.
sub _vanilla_error {
	my $self = shift;
	my $msg = shift || '';

	$self->{error} = Net::Wire10::Error->new(-1, '', $msg) unless defined ($self->{error});
	$self->{error}->{message} = $msg if length($msg) > 0;

	die $self->{error}->{message};
}

# Receives data from the network and reassembles fragmented packets
sub _receive_packet_data {
	my $self = shift;
	my $socket = $self->{socket};
	my $io_select = $self->{io_select};
	my $data;

lib/Net/Wire10.pm  view on Meta::CPAN

		# Unsupported:  RESERVED
		FLAG_SECURE_CONNECTION +
		# Unsupported:  MULTI_STATEMENTS
		# Necessary to avoid server abort:  FLAG_MULTI_RESULTS
		FLAG_MULTI_RESULTS;
		# Unsupported:  SSL_VERIFY_SERVER_CERT
		# Unsupported:  REMEMBER_OPTIONS

	# Optional flags
	my $customizable_flags =
		FLAG_FOUND_ROWS +
		FLAG_NO_SCHEMA +
		FLAG_ODBC +
		FLAG_IGNORE_SPACE +
		FLAG_INTERACTIVE;

	my $flags = $driver_flags | ($customizable_flags & $self->{flags});
	my $body .= Net::Wire10::Util::encode_my_uint($flags, 4);
	# Max macket size.  Completely disregarded by the server,
	# which just overflows macket data onto other mackets when
	# sending and accepts overflowed data when receiving.
	# See also note in _send_mackets().
	$body .= Net::Wire10::Util::encode_my_uint(0x01000000, 4);
	# Character set; hardcoded to UTF-8, used in _parse_column_info_macket().
	$body .= chr(UTF8_GENERAL_CI);
	# 23 bytes filler.
	$body .= "\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0";
	# Null-terminated: user name
	$body .= $self->{user} . "\0";
	if (length($self->{password}) > 0) {
		$body .= "\x14" . Net::Wire10::Password->scramble($self->{password}, $self->{salt});
	} else {
		$body .= "\0";
	}
	if (defined($self->{database})) {
		# Documentation says there should be a filler here,
		# but other clients don't send that.
		#$body .= "\0";
		# Null-terminated: initial default database name.
		$body .= $self->{database};
		$body .= "\0";
	}
	$self->_send_mackets($body, 1, MACKET_AUTHENTICATE);
}

# Sends old format client authentication
sub _send_old_password {
	my $self = shift;
	my $body = Net::Wire10::Password32->scramble(
		$self->{password}, $self->{salt_old}, 1
	) . "\0";
	$self->_send_mackets($body, 3, MACKET_AUTHENTICATE);
}

# Execute a SQL command
sub _execute_query {
	my $self = shift;
	my $sql = shift;
	my $wantstream = shift;

	$self->_check_streaming;
	$self->_check_connected;
	$self->_reset_command_state;
	$self->_reset_timeout($self->{query_timeout});
	$self->{streaming} = 1 if $wantstream;

	my $iterator = Net::Wire10::Results->new($self);
	$self->{streaming_iterator} = $iterator if $wantstream;

	# The protocol is configured to always use UTF-8 during handskake, to
	# avoid messing with all the local character sets.  Therefore any input
	# string needs to be automatically converted if it is not already UTF-8.
	utf8::upgrade($sql) if defined($sql) and not utf8::is_utf8($sql);

	printf "Executing query: %s%s\n", substr($sql, 0, 100), length($sql) >= 100 ? " ..." : "" if $self->{debug} & 1;

	$self->_execute_command(COMMAND_QUERY, $sql, $iterator);
	return $iterator;
}

# Send a protocol command message
sub _execute_command {
	my $self = shift;
	my $command = shift;
	my $param = shift;
	my $iterator = shift;

	# Abort early if the driver is no longer connected.
	$self->_check_connected;

	# Strip the utf8 flag from the string,
	# otherwise the socket send() complains.
	Encode::_utf8_off($param);

	# Send the SQL command
	my $body = $command . $param;
	$self->_send_mackets($body, 0, MACKET_COMMAND);
	$self->{expected_macket} = MACKET_OK | MACKET_RESULT_SET_HEADER;

	# Receive the result from the database
	my $macket = $self->_next_macket;

	if ($macket->{type} == MACKET_ERROR) {
		$self->_detach_results($iterator);
		$self->_parse_error_macket($macket);
		$self->_vanilla_error;
	}
	if ($macket->{type} == MACKET_RESULT_SET_HEADER) {
		my $pos = MACKET_HEADER_LENGTH;
		$self->_parse_result_set_header_macket($macket, \$pos);
		$self->_retrieve_column_info($iterator);
		$self->_retrieve_results($iterator) unless $self->{streaming};
	}
	if ($macket->{type} == MACKET_OK) {
		$self->_parse_ok_macket($macket, $iterator);
		$self->_detach_results($iterator);
	}
}

# Reads and interprets result set header
sub _parse_result_set_header_macket {
	my $self = shift;
	my $macket = shift;
	my $pos = shift;

	$self->{no_of_columns} = $self->_decode_lcb_or_fail($macket->{buf}, $pos);
	printf "Number of columns: %d\n", $self->{no_of_columns} if $self->{debug} & 1;
	# Optionally the "extra" field is in the macket
	my $macket_length = length($macket->{buf}) - MACKET_HEADER_LENGTH;
	if ($macket_length - 1 > $pos) {
		my $extra = $self->_decode_lcb_or_fail($macket->{buf}, $pos);
		printf "Extra information (ignored): %d\n", $extra if $self->{debug} & 1;
	}
}

# Reads and stores error message, sql state and error code
sub _parse_error_macket {
	my $self = shift;
	my $macket = shift;
	my $extra = shift || '';

	if ($macket->{type} != MACKET_ERROR) {
		$self->_fatal_error("Expected error macket");
	}

	# skip macket header
	my $pos = MACKET_HEADER_LENGTH;
	# skip macket type
	$pos += 1;
	my $left = length($macket->{buf}) - $pos;
	$self->_fatal_error("Truncated error macket") if $left < 2;
	# error code
	my $code = Net::Wire10::Util::decode_my_uint($macket->{buf}, \$pos, 2);
	# Documentation says there always is a SQLSTATE marker here,
	# but that is not true.
	my $sqlstate_marker = substr($macket->{buf}, $pos, 1);
	my $sqlstate = '';
	if ($sqlstate_marker eq '#') {
		# skip SQL state marker
		$pos += 1;
		# read SQL state
		my $sqlstate = substr($macket->{buf}, $pos, 5);
		$pos += 5;
	}
	# message
	my $message = substr($macket->{buf}, $pos);
	Encode::_utf8_on($message);

	# create error info object
	$self->{error} = Net::Wire10::Error->new($code, $sqlstate, $extra . $message);
}

lib/Net/Wire10.pm  view on Meta::CPAN

# interrupted at any point by an error from the server
sub _retrieve_row_data {
	my $self = shift;
	my $iterator = shift;

	$self->{expected_macket} = MACKET_ROW_DATA + MACKET_EOF;
	my $macket = $self->_next_macket;

	if ($macket->{type} == MACKET_ERROR) {
		$self->_parse_error_macket($macket, "Server reported error while reading row data: ");
		$self->_fatal_error;
	}
	if ($macket->{type} == MACKET_ROW_DATA) {
		# Note: The manual does not specify how the server fragments data.
		# One possibility would be that if the server sends data that fits
		# exactly on a macket boundary, then sends an additional completely
		# empty macket, to allow the client to assume that a maxed out macket
		# means that more fragments will follow.  Another possibility would
		# be that the server expects the client to deduce whether more mackets
		# are coming based on the contents of each macket, since fragmented
		# mackets only occur for data mackets which have an additional length
		# indicator (for the field data) inside the macket.  For the usual
		# lack of documentation, the method used below is a guess.
		my $nasty = length($macket->{buf});
		while ($nasty == MACKET_HEADER_LENGTH + 0xffffff) {
			# Glue together to form proper macket
			# (with invalid macket_length).
			printf "Fragmented macket found, retrieving one more fragment macket.\n" if $self->{debug} & 1;
			$self->{expected_macket} = MACKET_MORE_DATA;
			my $next_macket = $self->_next_macket;
			if ($macket->{type} == MACKET_ERROR) {
				$self->_parse_error_macket($macket, "Server reported error while reading more row data: ");
				$self->_fatal_error;
			}
			$nasty = length($next_macket->{buf});
			# Remove header from next fragment.
			substr($next_macket->{buf}, 0, MACKET_HEADER_LENGTH, "");
			# Concatenate macket contents.
			$macket->{buf} .= $next_macket->{buf};
		}
		# Remove header from first fragment.
		substr($macket->{buf}, 0, MACKET_HEADER_LENGTH, "");
		printf "Unshifting %d byte(s) of row data onto queue.\n", length($macket->{buf}) if $self->{debug} & 1;
		push(@{$iterator->{row_data}}, $macket->{buf});
		# More data may be available.
		return 1;
	}
	if ($macket->{type} == MACKET_EOF) {
		# Read EOF macket
		$self->_parse_eof_macket($macket, $iterator);
		$self->_detach_results($iterator);
		# No more data available.
		return 0;
	}
}

# Disconnect result set from driver, must only be done after all results have been read
sub _detach_results {
	my $self = shift;
	my $iterator = shift;
	$iterator->{wire}->{streaming_iterator} = undef;
	$iterator->{wire}->{streaming} = 0;
	$iterator->{wire} = undef;
}

# Reads and interprets OK, saving the number of affected rows,
# the insert id, and the server message.  Returns the number of
# affected rows
sub _parse_ok_macket {
	my $self = shift;
	my $macket = shift;
	my $iterator = shift;

	# Because affected rows is a Length Coded Binary
	# we use pointers to the position in the macket.
	# The position is updated in each method called
	# to ensure that the macket is read correctly.
	# First, skip macket header and macket type.
	my $pos = MACKET_HEADER_LENGTH + 1;
	printf "\n%s():\n", (caller(1))[3] if $self->{debug} & 1;
	# Affected rows
	my $affected = $self->_decode_lcb_or_fail($macket->{buf}, \$pos);
	printf "  -> Affected rows: %d\n", $affected if $self->{debug} & 1;
	# Insert id
	my $raw_id = $self->_skip_lcb_or_fail($macket->{buf}, \$pos);
	printf "  -> Insert id (decode postponed): %d byte(s)\n", length($raw_id) if $self->{debug} & 1;
	my $left = length($macket->{buf}) - $pos;
	$self->_fatal_error("Truncated OK macket") if $left < 4;
	# Server status
	my $status = Net::Wire10::Util::decode_my_uint($macket->{buf}, \$pos, 2);
	printf "  -> Server status flags (ignored): 0x%x\n", $status if $self->{debug} & 1;
	# Warning count
	my $warnings = Net::Wire10::Util::decode_my_uint($macket->{buf}, \$pos, 2);
	printf "  -> Warning count: %d\n", $warnings if $self->{debug} & 1;
	# Message
	my $message = substr($macket->{buf}, $pos);
	Encode::_utf8_on($message);
	printf "  -> Server message (ignored): %s\n", $message if $self->{debug} & 1;

	if (defined($iterator)) {
		$iterator->{no_of_affected_rows} = $affected;
		$iterator->{raw_insert_id} = $raw_id;
		$iterator->{warnings} = $warnings;
	}
}

# Reads and interprets EOF, saving relevant values.
sub _parse_eof_macket {
	my $self = shift;
	my $macket = shift;
	my $iterator = shift;

	my $pos = MACKET_HEADER_LENGTH + 1;
	my $left = length($macket->{buf}) - $pos;
	$self->_fatal_error("Truncated EOF macket") if $left < 4;
	printf "\n%s():\n", (caller(1))[3] if $self->{debug} & 1;
	# Warning count
	my $warnings = Net::Wire10::Util::decode_my_uint($macket->{buf}, \$pos, 2);
	printf "  -> Warning count: %d\n", $warnings if $self->{debug} & 1;
	# Server status
	my $status = Net::Wire10::Util::decode_my_uint($macket->{buf}, \$pos, 2);
	printf "  -> Server status flags: 0x%x\n", $status if $self->{debug} & 1;

lib/Net/Wire10.pm  view on Meta::CPAN

	my $self = shift;
	my $macket = shift;

	return ord(substr($macket->{buf}, 3, 1));
}

# Given a macket, extract length from its header
sub _extract_macket_length {
	my $self = shift;
	my $macket = shift;
	my $pos = 0;

	return Net::Wire10::Util::decode_my_uint($macket->{buf}, \$pos, 3);
}

# Wrap the skip_lcb library function in an error handler,
# catching errors and sending them to the driver's _fatal_error()
sub _skip_lcb_or_fail {
	my $self = shift;
	my $buf = shift;
	my $pos = shift;

	my $result = eval {
		return Net::Wire10::Util::skip_lcb($buf, $pos);
	};
	$self->_fatal_error($@) if $@;
	return $result;
}

# Wrap the decode_lcb library function in an error handler,
# catching errors and sending them to the driver's _fatal_error()
sub _decode_lcb_or_fail {
	my $self = shift;
	my $buf = shift;
	my $pos = shift;

	my $result = eval {
		return Net::Wire10::Util::decode_lcb($buf, $pos);
	};
	$self->_fatal_error($@) if $@;
	return $result;
}

# Wrap the decode_string library function in an error handler,
# catching errors and sending them to the driver's _fatal_error()
sub _decode_string_or_fail {
	my $self = shift;
	my $buf = shift;
	my $pos = shift;

	my $result = eval {
		return Net::Wire10::Util::decode_string($buf, $pos);
	};
	$self->_fatal_error($@) if $@;
	return $result;
}

# Resets the command execution status
sub _reset_command_state {
	my $self = shift;
	# Disconnect streaming iterator.
	$self->_detach_results($self->{streaming_iterator}) if defined($self->{streaming_iterator});
	# Reset internal column counter.
	$self->{no_of_columns} = undef;
	# Reset error state.
	$self->{error} = undef;
	# Reset cancel flag.
	$self->{cancelling} = 0;
}

# Reset entire connection
sub _reset_connection_state {
	my $self = shift;
	$self->{protocol_version} = undef;
	$self->{server_version} = undef;
	$self->{salt} = '';
	$self->{packet_buffer} = '';
	$self->{packet_goal} = undef;
	$self->{packet_read} = 0;
	$self->{socket} = undef;
	$self->{io_select} = undef;
	$self->{expected_macket} = undef;
	$self->{macket_queue} = [];
	$self->{command_expire_time} = undef;
}

# Dumps the packet to standard output, useful for debugging
sub _dump_packet {
	my $self = shift;
	return unless $self->{debug} & 4;
	my $packet = shift;
	my $str = sprintf "\n%s():\n", (caller(1))[3];
	my $len = length($packet);
	my $skipped = 0;
	my $pos = -16;
	while ($packet =~ /(.{1,16})/sg) {
		$pos += 16;
		unless ($self->{debug} & 8) {
			if (($len > 528) && ($pos > 256) && ($len - $pos > 256)) {
				if (! $skipped) {
					print "\n\n" . ' ' x 25 . "... snip more data ...\n";
					$skipped = 1;
				}
				next;
			}
		}
		my $line = $1;
		$str .= join ' ', map {sprintf '%02X', ord $_} split //, $line;
		$str .= '   ' x (16 - length $line);
		$str .= '  ';
		$str .= join '', map {
			sprintf '%s', (/[\w\d\*\,\?\%\=\'\;\(\)\.-]/) ? $_ : '.'
		} split //, $line;
		print $str;
		$str = "\n";
	}
	print $str;
}

# Dumps the macket to standard output, useful for debugging.
sub _dump_macket {
	my $self = shift;

lib/Net/Wire10.pm  view on Meta::CPAN

	my $len = length($macket->{buf});
	my $skipped = 0;
	my $pos = -16;
	while ($macket->{buf} =~ /(.{1,16})/sg) {
		$pos += 16;
		unless ($self->{debug} & 16) {
			if (($len > 528) && ($pos > 256) && ($len - $pos > 256)) {
				if (! $skipped) {
					print "\n\n" . ' ' x 25 . "... snip more data ...\n";
					$skipped = 1;
				}
				next;
			}
		}
		my $line = $1;
		$str .= ' ' x 13 if substr($str, -1, 1) eq "\n";
		$str .= join ' ', map {sprintf '%02X', ord $_} split //, $line;
		$str .= '   ' x (16 - length $line);
		$str .= '  ';
		$str .= join '', map {
			sprintf '%s', (/[\w\d\*\,\?\%\=\'\;\(\)\.-]/) ? $_ : '.'
		} split //, $line;
		print $str;
		$str = "\n";
	}
	print $str;
}



package Net::Wire10::Results;

use strict;
use warnings;
use Encode;

# Constructor
sub new {
	my $class = shift;
	my $wire = shift;
	return bless {
		wire => $wire,
		column_info => [],
		row_data => []
	}, $class;
}

# Gets next row as an array
sub next_array {
	my $self = shift;
	my @result;
	my $row;
	my $wire = $self->{wire};

	# Note: A die() from this context often brings an application down.
	#       For disconnected result sets, row data could be integrity
	#       checked during query() to simplify error handling for
	#       applications.

	if (defined($wire)) {
		# In streaming mode, fetch a row
		return undef unless $wire->_retrieve_row_data($self);
	}

	# Return unless there is another row available
	return undef if scalar(@{$self->{row_data}}) == 0;

	$row = shift(@{$self->{row_data}});
	my $pos = 0;
	for (my $i = 1; $i <= scalar(@{$self->{column_info}}); $i++) {
		my $fieldvalue = eval {
			return Net::Wire10::Util::decode_string($row, \$pos);
		};
		$self->_abort($@) if $@;
		my $collation = $self->{column_info}->[$i - 1]->{"collation"};
		$Net::Wire10::UTF8_COLLATIONS{$collation} and Encode::_utf8_on($fieldvalue);
		push @result, $fieldvalue;
	}

	return \@result;
}

# Leave driver in a consistent state and then die.
sub _abort {
	my $self = shift;
	my $msg = shift;
	eval {
		$self->spool;
	};
	die $msg;
}

# Gets next row as a hash
sub next_hash {
	my $self = shift;
	my $row = $self->next_array;
	return undef unless defined $row;
	my %result = map { $_->{name} => shift(@{$row}) } @{$self->{column_info}};
	return \%result;
}

# Retrieve and store remaining rows
sub spool {
	my $self = shift;
	my $wire = $self->{wire};
	return undef unless defined($wire);
	$wire->_retrieve_results($self);
	return undef;
}

# Retrieve and dispose of remaining rows
sub flush {
	my $self = shift;
	while ($self->next_array) {};
	return undef;
}

# Get names or other information on every column
sub get_column_info {
	my $self = shift;
	my $what = shift;

lib/Net/Wire10.pm  view on Meta::CPAN

# http://bugs.mysql.com/bug.php?id=1337
my $workaround_bug_1337 = qr/^[0-9]+$/;

# Assemble and execute prepared statement.
sub _execute {
	my $self = shift;
	my $wantstream = shift;
	my $wire = $self->{wire};

	# Replace tokens with parameters.
	my $prepared = '';
	my $i = 0;
	my $has_charset = 0;
	foreach my $token (@{$self->{tokens}}) {
		# Look for a ? token.
		if ($token ne '?') {
			$prepared .= $token;
			# Look for character set associations for upcoming ? tokens.
			my $first = substr($token, 0, 1);
			$has_charset = 0 if $first !~ $whitespace;
			$has_charset = 1 if $first eq '_';
			# Fast-forward to next token.
			next;
		}

		# Fetch and quote parameter, or add NULL for undef.
		my $value = $self->{params}->[$i++];
		unless (defined($value)) {
			$prepared .= 'NULL';
			next;
		}
		$value = Net::Wire10::Util::quote($value) if $value !~ $workaround_bug_1337;

		# If input is binary, tell the database server that it's binary.
		# It usually doesn't matter much because binary data is usually
		# used in a binary context anyway, but it fixes various uncommon
		# scenarios such as doing a CONVERT on a binary string.
		$prepared .= '_binary' unless utf8::is_utf8($value) or $has_charset;

		# If input is binary, make sure that Perl doesn't try to translate it
		# from iso8859-1 (which it isn't) to Unicode when the string is joined
		# with actual Unicode text to form a MySQL query, which is a character
		# array of mixed text and binary data.
		Encode::_utf8_on($value) unless utf8::is_utf8($value);

		# Add parameter in place of token.
		$prepared .= $value;
	}

	print "Assembled statement: " . $prepared . "\n" if $wire->{debug} & 1;
	return $wire->stream($prepared) if $wantstream;
	return $wire->query($prepared);
}

# Run the prepared statement and spool results.
sub query {
	my $self = shift;
	return $self->_execute(0);
}

# Run the prepared statement and return a result object for streaming.
sub stream {
	my $self = shift;
	return $self->_execute(1);
}



package Net::Wire10::Error;

use strict;
use warnings;

# Constructor
sub new {
	my $class = shift;
	my $code = shift;
	my $state = shift;
	my $msg = shift;

	return bless {
		code => $code,
		state => $state,
		message => $msg,
	}, $class;
}

# Get the error code
sub get_error_code {
	my $self = shift;
	return $self->{code};
}

# Get the error SQL state designation
sub get_error_state {
	my $self = shift;
	return $self->{state} ? $self->{state} : '';
}

# Get the error message
sub get_error_message {
	my $self = shift;
	return $self->{message};
}



package Net::Wire10::Util;

use strict;
use warnings;

use constant LCB_NULL       => 251;
use constant LCB_UINT16     => 252;
use constant LCB_UINT24     => 253;
use constant LCB_UINT64     => 254;
use constant UINT8_LENGTH   => 1;
use constant UINT16_LENGTH  => 2;
use constant UINT24_LENGTH  => 3;
use constant UINT32_LENGTH  => 4;
use constant UINT64_LENGTH  => 8;

lib/Net/Wire10.pm  view on Meta::CPAN

  MySQL uses port 3306,
  Sphinx uses port 9306,
  Drizzle uses port 4427.

The default is 3306 (MySQL), which is the most commonly used at the moment.

Sphinx needs to be at least version 0.9.9-rc2 for SphinxQL to work.  There's more information about SphinxQL here: L<http://sphinxsearch.com/docs/current.html#sphinxql>.

=item database

Name of the initial default database, eg the database used for a query when that query does not specifically mention a database.  The server may deny login if a database is given and the user does not have the necessary privileges to access that data...

=item user

Username for identifying the service or user to the database server.  This will show up in the process list, which is useful if you need to see what or who is hogging resources on the server.

=item password

Password for authenticating the service or user to the database server.

=item connect_timeout

How long to wait before a connection attempt fails.

=item command_timeout

How long to wait before a query is aborted.

=item debug

Various informational messages will be printed to the console if a value of 1 is given.  The exchanged network protocol messages ("mackets") will be printed to the console if a value of 2 is given.  The exchanged TCP packets will be printed to the co...

=back

=head3 connect

Establishes or re-establishes a connection to the server.

=head3 ping

Pings the daemon on the connected server over the wire protocol.

After connecting, a ping() is useful to ensure that the connection is still alive.  The current status of the connection can be looked up using is_connected().

=head3 query

The query() method transmits the specified SQL string to the server and obtains the response, including the full set of results.

A result set is returned.  The obtained result set will be disconnected from the driver.  Being disconnected means that after retrieving the result set, you can fire more queries or close the connection.

Use query() when a small amount of records is needed for use at an arbitrary later point in time.  If you want to stream data to a live result set, including large amounts of data, see stream().

=head3 stream

The stream() method transmits the specified SQL string to the server, and obtains initial information about the response, but does not begin downloading data.

A result set is returned.  The obtained result set will be live with the driver.  After retrieving the result set, you must traverse all of its rows before you can fire another query.

Use stream() for large result sets, as it has a smaller memory footprint compared to query().  If you want to download data to a disconnected result set, use query().

Note that stream() will lock the driver until the whole result set has been retrieved.  To fetch another set of results while streaming to a live result set, create another driver object.

Also note that if you are using MySQL with the default storage engine, MyISAM, the entire table on the server will be locked for the duration of the live result set, that is until all rows have been retrieved.

=head3 prepare

Given a SQL string, returns a prepared statement.

Prepared statements are useful for:

=over 4

=item Inserting binary data in the database.

For each parameter, an indication of whether the parameter should be considered text or binary data can be given.

=item Avoiding bugs that could lead to SQL injections.

Separating query logic and parameters can be beneficial in securing against SQL injections.  Because query parameters do not have to be manually quoted, there's less risk of forgetting to quote or using an insecure quoting mechanism.

=item Reusing query logic with multiple sets of parameters.

Prepared statements can be re-executed with a new set of parameters.

=back

Database and database object names cannot be parameterized, because schema identifiers need to be escaped in a different way than literal strings, and there is currently no additional marker implemented for this purpose.  Identifiers can be manually ...

=head3 cancel (thread safe)

Cancels the running query.

Safe to call asynchronously from a signal handler.  Example:

  # Press CTRL-C to cancel this query.
  {
    local $SIG{INT} = sub { $wire->cancel; };
    $wire->query("SELECT SLEEP(10)");
  }

Safe to call asynchronously from another thread (thread safe).  Example:

  use threads;
  use threads::shared;

  # we've chosen to use the threads::shared library, therefore
  # we need to share the driver object with this library.
  share $wire->{cancelling};

  # abort on some condition (sleep() for demonstration purposes)
  threads->create(sub {
    $wire->cancel if sleep 2;
  })->detach;

  # run query for 10 seconds (will be interrupted by above thread after 2 sec)
  $wire->query("SELECT SLEEP(10)");

Run the example above to see how cancel() works to interrupt the SLEEP(10) statement before it finishes, after only 2 seconds instead of the full 10.

Notice that the entire C<$wire> object is not shared above.  This is because threads::shared fails to share some of the driver's internal structures.  Luckily, C<cancel()> only needs the C<$wire->{cancelling}> variable to be shared between threads fo...

lib/Net/Wire10.pm  view on Meta::CPAN

=head3 is_connected

Returns true after connect() has been called, for as long as no fatal errors has occurred.

After a fatal error, is_connected returns false.

A successful call to connect() causes is_connected to return true once again.

=head3 get_error_info

Returns a L<Net::Wire10::Error> object if an error code and message is available from the server or the driver.

=head3 get_server_version

After connecting, this returns the server version.

=head3 get_connection_id

After connecting, this returns a unique identifier for the thread on the server that handles the current connection.

The connection identifier is useful for logging and debugging, killing queries or connections, for naming pseudo-temporary tables or other objects, and more.

=head3 disconnect

Transmits a goodbye message to the server to indicate intentions, then closes the underlying socket.

=head2 Features in I<Net::Wire10::PreparedStatement>

=head3 set_parameter

Sets the parameter at a given index to a given value.  The index corresponds to the relative position of a "?" marker in the prepared statement, with the first "?" marker being 1.

  $ps->set_parameter(2, 'Hello World');

There is no need to quote the parameter value, this is done automatically.

Binary data such as for example JPEG image files can also be added:

  $ps->set_parameter(3, $bindata, DATA_BINARY);

If the last parameter is specified as DATA_BINARY, the value given is taken as a binary string.  Otherwise, the value is taken as either an iso8859-1 string or a Unicode string, depending on what Perl thinks about the string.

=head3 clear_parameter

Clears one or more parameters, or all parameters if nothing was specified.

  $ps->clear_parameter(1, 3);

=head3 get_token_count

Returns the number of "?" tokens found in the SQL initially used to create the prepared statement.

=head3 query

Execute the prepared statement using the parameters previously set with set_parameter().
All results are spooled before the call returns.

=head3 stream

Execute the prepared statement using the parameters previously set with set_parameter().
As soon as the initial metadata arrives from the database server, the call returns, and the results can be traversed in a streaming fashion.

=head2 Features in the I<Net::Wire10::Results> iterator

A Net::Wire10::Results object is returned when calling query() or stream().  Depending on which was used to execute the SQL, either a disconnected result set or a live (streaming) result set is returned.

=head3 next_array

The next_array() method returns a whole row, with individual field values packed into an array.
C<undef> is returned once all rows has been extracted.

  while (my $row = $results->next_array) {
    printf
      "Value 1: %s Value 2: %s Value 3: %s\n",
      $row->[0],
      $row->[1],
      $row->[2];
  }

When the retrieved columns has been specifically named in the SELECT statement (rather than using the C<SELECT *> wildcard), the position of each individual field in result set rows are known, and next_array() can be used to access field values based...

Column name and order for a C<SELECT *> query can be retrieved using get_column_info("name").

After calling next_array(), the row has been consumed.  There is currently no way to rewind and re-read a row, even for a disconnected result set.

=head3 next_hash

The next_hash() method returns a whole row, with individual field values packed into a hash.  The key of the hash is the name of the column that each field belongs to.
C<undef> is returned once all rows has been extracted.

  while (my $row = $results->next_hash) {
    printf
      "Id: %s Name: %s Data: %s\n",
      $row->{id},
      $row->{name},
      $row->{data};
  }

Using next_hash() instead of next_array() usually makes the code a bit more readable, especially in cases where a SELECT with column names is not nearby.

After calling next_hash(), the row has been consumed.  There is currently no way to rewind and re-read a row, even for a disconnected result set.

=head3 spool

Reads the remaining rows of the result set, in effect turning a streaming result set into a disconnected result set.

=head3 flush

Reads the remaining rows of the result set and discards them.  When done on a live result set, this frees the driver for use.

=head3 get_column_info

Return the names and other information of the result set's columns as an array.

If you want all information, call get_column_info with no parameters:

  my $column_info = $wire->get_column_info;
  my $first_col_name = $column_info[0]->{name};

If you just need the count of columns, you can ask Perl to reduce the array with "scalar":

  my $nr_cols = scalar $wire->get_column_info;

If you need just the name of each column, specify the wanted metadata item:

  my $column_names = $wire->get_column_info("name");
  my $first_col_name = $column_names[0];

The available metadata is:
  * name (column name or alias)
  * object (database object containing column, typically a TABLE or VIEW name)
  * orig_database (source database where data originally came from, if available)
  * orig_table (source table where data originally came from, if available)
  * orig_column (source column where data originally came from, if available)
  * collation (either binary, or utf8 with a given sort order and case sensitivity)
  * data_type (SQL data type)
  * decimal_scale (number of digits after the decimal point)
  * flags (SQL data type specification and other items)

=head3 has_results

Returns true if the query returned a set of results.

=head3 get_no_of_affected_rows

Returns the number of rows influenced by the UPDATE, DELETE or similar query.

  my $affected = $wire->get_no_of_affected_rows;

=head3 get_no_of_selected_rows

Returns the number of rows in the result set of a SELECT or similar query.

  my $selected = $wire->get_no_of_selected_rows;

After consuming a row, the number of available rows will decrease, and get_no_of_selected_rows() will return one less.

Supported only for disconnected result sets, live/streaming result sets are unaware of the total number of records.  A streaming result set can be turned into a disconnected result set using spool(), after which the number of available rows becomes k...

=head3 get_insert_id

MySQL and Drizzle has the ability to choose unique key values automatically, by enabling auto_increment for a column.  When this happens, the newly assigned id value for the last inserted row is stored in this attribute.

=head3 get_warning_count

After a query, this returns the number of warnings generated on the server.  If the query is streaming to a live result set, an additional warning count is available after the last row of data has been read.

=head2 Features in the I<Net::Wire10::Error> object

After an error has occurred, call $wire->C<get_error_info()> to retrieve an Error object containing details of the problem.

=head3 get_error_code

Returns the server-reported (1xxx) error code, or a client (2xxx) error code.

=head3 get_error_state

Returns the SQL state code sent by the server when an error occurs, or HY000 for errors that does not have a standardized designation, or an empty string for client errors and some server errors.

=head3 get_error_message

Returns an error string sent by the server.  Usually contains a mix of text corresponding to the error code (see above), in whatever language the server is started with, and some factual information from the query that caused the error or similar con...

=head1 ERROR HANDLING

There are two kind of errors in Net::Wire10, fatal and non-fatal errors.
Fatal errors causes the connection to close, while non-fatal errors do not.

Non-fatal errors cause the active query to abort.

A query may also cause warnings and informational messages, these are per
default only reported summarily via get_warning_count().  The actual messages
can be retrieved out-of-band from the server with the SHOW WARNINGS command.

=head2 Example: catching errors

All errors can be caught with an eval {} construct.
To differentiate between a fatal and a non-fatal error, use is_connected().

  # Create driver and connect
  $wire = Net::Wire10->new(host=>'localhost', user=>'test', password=>'test');
  $wire->connect;

  # Execute nonsensical query
  eval { $wire->query('Argle-bargle, glyp-glof?!'); };
  warn $@ if $@;
  print ($wire->is_connected ? "is" : "is not") . " connected.";

=head2 Example: recognizing fatal errors

Here's a query that causes a fatal error:

  # Execute query that kills the current connection
  eval { $wire->query('KILL CONNECTION CONNECTION_ID()'); };
  warn $@ if $@;
  print ($wire->is_connected ? "is" : "is not") . " connected.";

After running the above code, it is necessary to reconnect the driver before doing additional work.

=head2 Example: reconnecting

Once a fatal error has happened, it is trivial to reestablish a connection to the server.

  # Reconnect if necessary
  $wire->connect unless $wire->is_connected;

lib/Net/Wire10.pm  view on Meta::CPAN


An even more economical method is to use a prepared statement to send the binary data in its raw form.

=head3 Protocol compression via zlib

There is not much documentation regarding protocol compression, therefore it has not been implemented.

It is possible to add compression at the network level instead using a tunnel rather than the protocol's own support, similarly to stunnel described below, if network bandwidth is at a premium.

Another option is to selectively compress data with zlib before handing it off to the driver, and using the function UNCOMPRESS() to expand the data again once it reaches the server:

  # Contains helper that does zlib compression.
  use Compress::Zlib;

  # Compress some data.
  my $compressed = compress('Hello World!');

  # Create a prepared statement and bind the compressed data.
  my $ps = $wire->prepare('SELECT UNCOMPRESS(?)');
  $ps->set_parameter(1, $compressed, DATA_BINARY);

  # Run the query to decompress data on the server side.
  $res = $ps->query();

=head3 Verification of authenticity using SSL certificates

Verifying the server requires SSL support which is not currently implemented.

Verifying the client is not supported by the wire protocol.

=head3 SSL/TLS encrypted communication

There is not much documentation regarding protocol support for SSL/TLS, therefore it has not been implemented.

It is possible to add encryption at the network level by using a SSL/TLS wrapper such as "stunnel".

Stunnel provides a richer set of features than the current MySQL protocol supports, such as certificate-based authentication of the client in addition to the server.

Integrated support would be desirable because of simpler error handling and the possibility of performing key creation and certificate signing tasks via SQL.

=head3 Protocol-level prepared statements

The protocol design includes protocol-level support for server-side prepared statements.  When implemented at a protocol level, these are faster than regular statements.  Prepared statements execute faster because query parameters such as text does n...

Another performance benefit of prepared queries is that it is possible to perform multiple executions of the same query.  The query logic is sent and parsed only once, saving a bit of network bandwidth and CPU cycles.

The current protocol design does not allow you to clear a single parameter at a time, but forces you to clear all parameters at once.  When reusing a prepared statement for multiple executions, parameters that do not change from one execution to the ...

Some drivers do not pipeline the prepare, parameter, execute and deallocate steps, instead waiting for a response from the server after each step.  Such drivers may actually run some queries slower when using prepared statements.

A detail in the protocol specification is that results from non-prepared statements are pushed to the client, whereas results from prepared statements are pulled by the client.  The pull mechanism makes it possible to stop in the middle of a result s...

Another, related detail in the protocol specification is that prepared statements are assigned a number each, whereas non-prepared statements are not.  Multiple prepared statements can therefore be active, running over the same connection at the same...

In short, protocol-level prepared statements would be very nice to have, but are currently not implemented.  Protocol details are described in the manual, but with gaps in the documentation.

There is also a purely SQL-driven interface to prepared statements, using the PREPARE, EXECUTE and DEALLOCATE PREPARE commands.  The SQL-driven interface does not have the same performance benefits as the protocol-level one does.  It can however be u...

There is also a client-side prepared statement interface in the driver, which it is highly recommended to use.

=head3 High-granularity streaming

Streaming data along the way as it is consumed or delivered by the client application can lead to dramatical decreases in memory usage.

Streaming outgoing data can be accomplished with server-side prepared statements, because the wire protocol allows prepared statement parameters to be sent one at a time, and even in chunks.  Chunking data presumably allows you to interleave data to ...

The driver API currently allows client applications to stream incoming data one row at a time.  The highest supported granularity for streaming is one whole row at a time.  Streaming at a higher granularity and interleaving chunks of incoming data is...

Streaming incoming data in a row-by-row fashion is also known in some drivers as "use_result mode".

=head3 Server-side cursors

Cursors provide a way of keeping open, navigating and optionally modifying a result set on the server, rather than having to transfer the entire result set to the client.

MySQL Server supports only a particular class of cursors known as read-only, forward-only cursors.  As the name implies, this kind of cursor cannot be navigated (also called "scrolled") or modified.

The semantics are exactly the same as when streaming incoming data with the L<stream>() call.  With cursors however, the server automatically creates a temporary table behind the scenes from which the data is read, thereby always using extra resource...

There is also a purely SQL-driven interface to cursors, which can be useful inside stored procedures.

Server-side cursors are part of the prepared statement protocol features (see above), and are therefore currently not supported.

=head3 Multiple statements per query

Currently unsupported.  Multiple statements can often cause more trouble than gain by making SQL injection (a security risk) much easier and in return providing diminutive performance gains when compared to other approaches.

If you want to run multiple queries, one method is to create two separate connections.  This can also give a performance boost, because one query does not wait for the other to finish.  Another related advantage is that multi-core servers can actuall...

Two or more queries can be started simultaneously without resorting to multiple threads on the client.  This is done by starting the queries using the stream() call, which does not wait for the server to return data.

See also notes on connection pooling, which can be a useful technique to avoid initialization overhead when creating multiple connections.

=head3 Multiple result sets per query

Currently unsupported.  Multiple result sets per query is documented to be incompatible with prepared statements (but the reason why is not).

=head3 Non-query related commands

Various non-query-related protocol commands that may be useful are not currently supported, for example COM_SHUTDOWN to initiate a server shutdown and COM_DEBUG to trigger the server into dumping a bunch of debug information.

=head3 Automatic reconnect

It is often desirable to be able to restart a database server without having all of its clients drop the ball.

Ensuring that the connection works before starting processing in the client often fixes a majority of the problem.  Client code can do a ping() and if necessary a connect() when it starts.  For web applications, this is one of the things that L<Apach...

The protocol design does not include a feature to notify clients when the server is going away.  They will usually only find out when performing a query or a ping.  In cases where the method mentioned above is not good enough, other tactics are avail...

An additional measure of reliability can be added by enclosing groups of database calls that are restartable in a guard block that catches database errors.  If a lost connection occurs, the entire group of calls can be restarted.

Queries that are not normally restartable include those that write data to the database with INSERT, UPDATE or similar.  A simple solution may be to wrap these in a transaction so that changes are automatically cancelled, thus allowing these queries ...

Some drivers have an automatic reconnect feature.  This is problematic because only the simplest of SQL queries are repeatable without causing unintended side effects.  Queries have to be repeated, because the protocol does not offer a mechanism for ...

In addition, the protocol does not normally carry information about transactions, and doing an automatic reconnect will silently terminate any running transactions.

If you can live with these problems, and want to avoid writing restartable client code or using transactions, there is the possibility of writing a wrapper around the database class which simply catches all calls to C<query()> and wraps them in somet...

The driver does not have such a feature built-in.

=head2 Out of scope features

=head3 Connection pooling

Connection pooling can improve performance by removing the need for a TCP handshake and SSL/TLS handshake (for encrypted sessions) when opening a connection, and can also alleviate TIME_WAIT problems in environments with many short-lived connections.

There is no particular advantage to putting pooling code in the driver core itself, but a standard implementation that comes along with the driver would be nice.  Currently no such thing exists for this driver.

If you need connection pooling for a web site, the L<Apache::DBI> module will provide this when used together with L<DBD::Wire10>.  It actually pools entire processes, including database connections which become persistent by virtue of C<Apache::DBI>...

=head3 The LOAD DATA LOCAL INFILE client statement

Poorly documented and therefore unsupported.

A simple workaround is to parse the CSV data on the client using L<Text::CSV> (or even L<DBD::CSV>), and upload it to the server in reasonably sized large chunks with the "extended insert" syntax.  

If necessary, you can also emulate the LOAD DATA LOCAL INFILE client statement using two other queries in unison, namely SELECT INTO DUMPFILE and LOAD DATA INFILE.



( run in 0.446 second using v1.01-cache-2.11-cpan-39bf76dae61 )