NAME¶
MCE::Queue - Hybrid queues (normal including priority) for Many-core Engine
VERSION¶
This document describes MCE::Queue version 1.517
SYNOPSIS¶
use MCE;
use MCE::Queue;
my @dirs = (".");
my $D = MCE::Queue->new( queue => \@dirs );
my $F = MCE::Queue->new();
## Notice the use of dequeue_nb (non-blocking) for the initial
## task and dequeue for the task afterwards. The first task is
## recursive.
my $mce = MCE->new(
user_tasks => [{
max_workers => 4,
task_end => sub {
## Signal workers no more work remains. The number 4
## indicates the numbers of workers for the 2nd task
## performing the read.
$F->enqueue((undef) x 4);
},
user_func => sub {
## Pause briefly to allow time for wid 1 to add items.
select(undef, undef, undef, 0.05) if (MCE->task_wid > 1);
## Worker will loop until no more directories.
while (defined (local $_ = $D->dequeue_nb)) {
my ($files, $dirs) = part { -d $_ ? 1 : 0 } glob("$_/*");
$D->enqueue(@$dirs ) if defined $dirs;
$F->enqueue(@$files) if defined $files;
}
MCE->say("STDERR", "(D) worker has ended");
return;
}
},{
max_workers => 4,
user_func => sub {
## Worker will loop until no more files.
while (defined (local $_ = $F->dequeue)) {
MCE->say($_);
}
MCE->say("STDERR", "(F) worker has ended");
return;
}
}]
);
$mce->run;
DESCRIPTION¶
This module provides a queue interface supporting normal and priority queues and
utilizing the IPC engine behind MCE. Data resides under the manager process.
MCE::Queue also allows for a worker to create any number of queues locally not
available to other workers including the manager process. Think of a CPU
having L3 (shared) and L1 (local) cache.
The structure for the MCE::Queue object is provided below. It allows for normal
queues to run as fast as an array. Data for priority queues are also nearly as
fast due to having a brief lookup if the priority exists in the hash including
adding/removal of the key. The heap array contains only priorities, not the
data itself. This makes the management of the heap order only as necessary
while running.
## Normal queue data
$_queue->{_datq} = [];
## Priority data { p1 => [ ], p2 => [ ], pN => [ ] }
$_queue->{_datp} = {};
## Priority heap [ pN, p2, p1 ] ## in heap order
## fyi, _datp will always dequeue before _datq
$_queue->{_heap} = [];
## Priority order (default)
$_queue->{_porder} = $MCE::Queue::HIGHEST;
## Priority type (default)
$_queue->{_type} = $MCE::Queue::FIFO;
IMPORT¶
Two options are available for overriding the default value used when creating
new queues (porder applies to priority queues only).
use MCE::Queue porder => $MCE::Queue::HIGHEST,
type => $MCE::Queue::FIFO;
use MCE::Queue; ## same as above
porder => $HIGHEST = Highest priority items are dequeued first
$LOWEST = Lowest priority items are dequeued first
type => $FIFO = First in, first out
$LILO = (Synonym for FIFO)
$LIFO = Last in, first out
$FILO = (Synonym for LIFO)
THREE RUN MODES¶
MCE::Queue can be utilized under the following conditions:
A) use MCE; B) use MCE::Queue; C) use MCE::Queue;
use MCE::Queue; use MCE;
- A) Loading MCE prior to inclusion of MCE::Queue
- The dequeue method blocks for the manager process including workers. All
data resides under the manager process. Workers send/request data through
IPC.
Creating a queue from the worker process will cause the queue to run in
local mode. The data resides under the worker process and not available to
other workers including the manager process.
- B) Loading MCE::Queue prior to inclusion of MCE
- Queues behave as if running in local mode for the manager including
workers for the duration of the script. I cannot think of a use-case for
this, but wanted to mention the behavior in the event MCE::Queue is loaded
prior to MCE.
- C) Loading MCE::Queue without MCE
- The dequeue method is non-blocking in this fashion. This behaves like
local mode when MCE is not present. As with local queuing, this mode is
speedy due to minimum overhead and zero IPC.
Essentially, the MCE module is not a prerequisite for using MCE::Queue.
API DOCUMENTATION¶
- ->new ( [ queue => \@array ] )
- This creates a new queue. Available options are queue, porder, type, and
gather. The gather option is mainly for running with MCE and wanting to
pass item(s) to a callback function for adding to the queue.
use MCE;
use MCE::Queue;
my $q1 = MCE::Queue->new();
my $q2 = MCE::Queue->new( queue => [ 0, 1, 2 ] );
my $q3 = MCE::Queue->new( porder => $MCE::Queue::HIGHEST );
my $q4 = MCE::Queue->new( porder => $MCE::Queue::LOWEST );
my $q5 = MCE::Queue->new( type => $MCE::Queue::FIFO );
my $q6 = MCE::Queue->new( type => $MCE::Queue::LIFO );
Multiple queues may point to the same callback function. Please note that
the first argument for the callback function is the queue object itself.
sub _append {
my ($Q, @items) = @_;
$Q->enqueue(@items);
}
my $q7 = MCE::Queue->new( gather => \&_append );
my $q8 = MCE::Queue->new( gather => \&_append );
## Items are diverted to the gather callback function.
$q7->enqueue( 'apple', 'orange' );
The gather option is useful when wanting to temporarily store items in a
holding area until output order can be obtained. Although a queue is not
required to gather data in MCE, this is simply a demonstration of the
gather option in the context of a queue.
use MCE;
use MCE::Queue;
my ($_order_id, %_tmp);
sub _preserve_order {
my ($Q, $chunk_id, $result) = @_;
$_tmp{$chunk_id} = $result;
while (1) {
last unless exists $_tmp{$_order_id};
$Q->enqueue( $_tmp{$_order_id} );
delete $_tmp{$_order_id++};
}
return;
}
my @squares; my $q = MCE::Queue->new(
queue => \@squares, gather => \&_preserve_order
);
$_order_id = 1; ## The first chunk_id equals 1;
my $mce = MCE->new(
chunk_size => 1, input_data => [ 1 .. 100 ],
user_func => sub {
$q->enqueue( MCE->chunk_id, $_ * $_ );
}
);
$mce->run;
print "@squares\n";
- ->clear ( void )
- Clears the queue of any items. This has the effect of nulling the queue.
Each queue comes with a socket used for blocking behind the scene. Use the
clear method when wanting to clear the content of the array.
my @a; my $q = MCE::Queue->new( queue => \@a );
@a = (); ## no, the block socket may become out of sync
$q->clear; ## ok
- ->enqueue ( $item [, $item, ... ] )
- Appends a list of items onto the end of the normal queue.
- ->enqueuep ( $p, $item [, $item, ... ] )
- Appends a list of items onto the end of the priority queue with
priority.
- ->dequeue ( [ $count ] )
- Returns the requested number of items (default is 1) from the queue.
Priority data will always dequeue first from the priority queue before any
data from the normal queue.
The method will block if the queue contains zero items. If the queue
contains fewer than the requested number of items, the method will not
block, but return the remaining items and undef for up to the count
requested.
The $count, used for requesting the number of items, is beneficial when
workers are passing parameters through the queue. For this release, always
remember to dequeue using the same multiple for the count. This is unlike
Thread::Queue which will block until the requested number of items are
available.
- ->dequeue_nb ( [ $count ] )
- Returns the requested number of items (default is 1) from the queue. Like
with dequeue, priority data will always dequeue first. This method is
non-blocking and will return undef in the absence of data from the
queue.
- ->insert ( $index, $item [, $item, ... ] )
- Adds the list of items to the queue at the specified index.
- ->insertp ( $p, $index, $item [, $item, ... ] )
- Adds the list of items to the queue at the specified index with
priority.
- ->pending ( void )
- Returns the number of items in the queue. This includes both normal and
priority data.
- ->peek ( [ $index ] )
- Returns an item from the normal queue, at the specified index, without
dequeuing anything. It defaults to the head of the queue if index is not
specified.
- ->peekp ( $p [, $index ] )
- Returns an item from the queue with priority, at the specified index,
without dequeuing anything. It defaults to the head of the queue if index
is not specified.
- ->peekh ( [ $index ] )
- Returns an item from the heap, at the specified index.
- ->heap ( void )
- Returns an array containing the heap data. Heap data consists of priority
numbers, not the data.
ACKNOWLEDGEMENTS¶
The main reason for writing MCE::Queue was to have a Thread::Queue-like module
for workers spawned as children. I was pleasantly surprised at the number of
modules on CPAN for queuing. What stood out immediately were all the priority
queues, heap queues, and whether (FIFO/LIFO) or (highest/lowest first) options
are available. Hence, the reason for MCE::Queue supporting both normal and
priority queues.
The following provides a list of resources I've read in helping me create
MCE::Queue for MCE.
- POE::Queue::Array
- Two if statements were adopted for checking if the item belongs at the end
or head of the queue.
- List::Binary::Search
- After glancing over the bsearch_num_pos method for returning the best
insert position, a couple variations of that were in order for MCE::Queue
to accommodate the highest/lowest order routines.
- Heap-Priority, List::Priority
- At this point, I thought why not have both normal queues and priority
queues be efficient. And with that in mind, also provide options to allow
folks to choose LIFO/LILO, and highest/lowest order for the queue. The
data structure in MCE::Queue is described above.
MCE workers also benefit from being able to create local queues not
available to other workers including the manager process. Hence, the
reason for the 3 run modes described at the beginning of this
document.
- Thread::Queue
- Being that MCE supports both children and threads, Thread::Queue was used
as a template for identifying and documenting the methods in MCE::Queue.
Although not 100% compatible, pay close attention to the dequeue method
when requesting the number of items to dequeue.
->enqueuep( $p, $item [, $item, ... ] ); ## Extension (p)
->enqueue( $item [, $item, ... ] );
->dequeue( [ $count ] ); ## Priority data dequeues first
->dequeue_nb( [ $count ] );
->pending(); ## Counts both normal/priority data
## in the queue
- Parallel-DataPipe
- The idea for the recursive synopsis used in this document came from
reading the example listed in this module's documentation.
INDEX¶
MCE
AUTHOR¶
Mario E. Roy, <marioeroy AT gmail DOT com>
LICENSE¶
This program is free software; you can redistribute it and/or modify it under
the terms of either: the GNU General Public License as published by the Free
Software Foundation; or the Artistic License.
See <
http://dev.perl.org/licenses/> for more information.