NAME¶
POE::Component::DBIAgent - POE Component for running asynchronous DBI calls.
SYNOPSIS¶
sub _start {
my ($self, $kernel, $heap) = @_[OBJECT, KERNEL, HEAP];
$heap->{helper} = POE::Component::DBIAgent->new( DSN => [$dsn,
$username,
$password
],
Queries => $self->make_queries,
Count => 3,
Debug => 1,
);
# Queries takes a hashref of the form:
# { query_name => 'select blah from table where x = ?',
# other_query => 'select blah_blah from big_view',
# etc.
# }
$heap->{helper}->query(query_name =>
{ cookie => 'starting_query' },
session => 'get_row_from_dbiagent');
}
sub get_row_from_dbiagent {
my ($kernel, $self, $heap, $row, $cookie) = @_[KERNEL, OBJECT, HEAP, ARG0, ARG1];
if ($row ne 'EOF') {
# {{{ PROCESS A ROW
#row is a listref of columns
# }}} PROCESS A ROW
} else {
# {{{ NO MORE ROWS
#cleanup code here
# }}} NO MORE ROWS
}
}
DESCRIPTION¶
DBIAgent is your answer to non-blocking DBI in POE.
It fires off a configurable number child processes (defaults to 3) and feeds
database queries to it via two-way pipe (or sockets ... however
POE::Component::Wheel::Run is able to manage it). The primary method is
"query".
Usage¶
After initializing a DBIAgent and storing it in a session's heap, one executes a
"query" (or "query_slow") with the query name, destination
session (name or id) and destination state (as well as any query parameters,
optionally) as arguments. As each row of data comes back from the query, the
destination state (in the destination session) is invoked with that row of
data in its $_[ARG0] slot. When there are no more rows to return, the data in
$_[ARG0] is the string 'EOF'.
Not EVERY query should run through the DBIAgent. If you need to run a short
lookup from within a state, sometimes it can be a hassle to have to define a
whole separate state to receive its value, and resume processing from there..
The determining factor, of course, is how long your query will take to
execute. If you are trying to retrieve one row from a properly indexed table,
use "$dbh->selectrow_array()". If there's a join involved, or
multiple rows, or a view, you probably want to use DBIAgent. If it's a longish
query and startup costs (time) don't matter to you, go ahead and do it
inline.. but remember the whole of your program suspends waiting for the
result. If startup costs DO matter, use DBIAgent.
Return Values¶
The destination state in the destination session (specified in the call to
"query()") will receive the return values from the query in its
$_[ARG0] parameter. DBIAgent invokes DBI's "fetch" method
internally, so the value will be a reference to an array. If your query
returns multiple rows, then your state will be invoked multiple times, once
per row.
ADDITIONALLY, your state will be called one time with $_[ARG0]
containing the string 'EOF'. 'EOF' is returned
even if the query
doesn't return any other rows. This is also what to expect for DML
(INSERT, UPDATE, DELETE) queries. A way to utilise this might be as follows:
sub some_state {
#...
if ($enough_values_to_begin_updating) {
$heap->{dbiagent}->query(update_values_query =>
this_session =>
update_next_value =>
shift @{$heap->{values_to_be_updated}}
);
}
}
sub update_next_value {
my ($self, $heap) = @_[OBJECT, HEAP];
# we got 'EOF' in ARG0 here but we don't care... we know that an
# update has been executed.
for (1..3) { # Do three at a time!
my $value;
last unless defined ($value = shift @{$heap->{values_to_be_updated}});
$heap->{dbiagent}->query(update_values =>
this_session =>
update_next_value =>
$value
);
}
}
new()¶
Creating an instance creates a POE::Session to manage communication with the
Helper processes. Queue management is transparent and automatic. The
constructor is named "new()" (surprised, eh? Yeah, me too). The
parameters are as follows:
- DSN
- An arrayref of parameters to pass to DBI->connect
(usually a dsn, username, and password).
- Queries
- A hashref of the form Query_Name => "$SQL".
For example:
{
sysdate => "select sysdate from dual",
employee_record => "select * from emp where id = ?",
increase_inventory => "update inventory
set count = count + ?
where item_id = ?",
}
As the example indicates, DBI placeholders are supported, as are DML
statements.
- Count
- The number of helper processes to spawn. Defaults to 3. The
optimal value for this parameter will depend on several factors, such as:
how many different queries your program will be running, how much RAM you
have, how often you run queries, and most importantly, how many queries
you intend to run simultaneously.
- ErrorState
- An listref containing a session and event name to receive
error messages from the DBI. The message arrives in ARG0.
query($query_name, [ \%args, ]
$session , $state, [
@parameters ])¶
The "query()" method takes at least three parameters, plus any bind
values for the specific query you are executing.
- $query_name
- This parameter must be one of the keys to the Queries
hashref you passed to the constructor. It is used to indicate which query
you wish to execute.
- \%args
- This is an OPTIONAL hashref of arguments to pass to the
query.
Currently supported arguments:
- hash
- Return rows hash references instead of array
references.
- cookie
- A cookie to pass to this query. This is passed back
unchanged to the destination state in $_[ARG1]. Can be any scalar
(including references, and even POE postbacks, so be careful!). You can
use this as an identifier if you have one destination state handling
multiple different queries or sessions.
- delay
- Insert a 1ms delay between each row of output.
I know what you're thinking: "WHY would you want to slow down query
responses?!?!?" It has to do with CONCURRENCY. When a response
(finally) comes in from the agent after running the query, it floods the
input channel with response data. This has the effect of monopolizing
POE's attention, so that any other handles (network sockets, pipes, file
descriptors) keep getting pushed further back on the queue, and to all
other processes EXCEPT the agent, your POE program looks hung for the
amount of time it takes to process all of the incoming query data.
So, we insert 1ms of time via Time::HiRes's "usleep" function. In
human terms, this is essentially negligible. But it is just enough time to
allow competing handles (sockets, files) to trigger "select()",
and get handled by the POE::Kernel, in situations where concurrency has
priority over transfer rate.
Naturally, the Time::HiRes module is required for this functionality. If
Time::HiRes is not installed, the delay is ignored.
- group
- Sends the return event back when "group" rows are
retrieved from the database, to avoid event spam when selecting lots of
rows. NB: using group means that $row will be an arrayref of rows, not
just a single row.
- $session, $state
- These parameters indicate the POE state that is to receive
the data returned from the database. The state indicated will receive the
data in its $_[ARG0] parameter. PLEASE make sure this is a valid
state, otherwise you will spend a LOT of time banging your head against
the wall wondering where your query data is.
- @parameters
- These are any parameters your query requires.
WARNING: You must supply exactly as many parameters as your query
has placeholders! This means that if your query has NO placeholders, then
you should pass NO extra parameters to "query".
Suggestions to improve this syntax are welcome.
finish()¶
The "finish()" method tells DBIAgent that the program is finished
sending queries. DBIAgent will shut its helpers down gracefully after they
complete any pending queries. If there are no pending queries, the DBIAgent
will shut down immediately.
NOTES¶
- •
- Error handling is practically non-existent.
- •
- The calling syntax is still pretty weak... but improving.
We may eventually add an optional attributes hash so that each query can
be called with its own individual characteristics.
- •
- I might eventually want to support returning hashrefs, if
there is any demand.
- •
- Every query is prepared at Helper startup. This could
potentially be pretty expensive. Perhaps a cached or deferred loading
might be better? This is considering that not every helper is going to run
every query, especially if you have a lot of miscellaneous queries.
Suggestions welcome! Diffs
more welcome! :-)
AUTHOR¶
This module has been fine-tuned and packaged by Rob Bloodgood
<robb@empire2.com>. However, most of the queuing code originated with
Fletch <fletch@phydeaux.org>, either directly or via his ideas. Thank
you for making this module a reality, Fletch!
However, I own all of the bugs.
This module is free software; you may redistribute it and/or modify it under the
same terms as Perl itself.