.\" Automatically generated by Pod::Man 4.10 (Pod::Simple 3.35) .\" .\" Standard preamble: .\" ======================================================================== .de Sp \" Vertical space (when we can't use .PP) .if t .sp .5v .if n .sp .. .de Vb \" Begin verbatim text .ft CW .nf .ne \\$1 .. .de Ve \" End verbatim text .ft R .fi .. .\" Set up some character translations and predefined strings. \*(-- will .\" give an unbreakable dash, \*(PI will give pi, \*(L" will give a left .\" double quote, and \*(R" will give a right double quote. \*(C+ will .\" give a nicer C++. Capital omega is used to do unbreakable dashes and .\" therefore won't be available. \*(C` and \*(C' expand to `' in nroff, .\" nothing in troff, for use with C<>. .tr \(*W- .ds C+ C\v'-.1v'\h'-1p'\s-2+\h'-1p'+\s0\v'.1v'\h'-1p' .ie n \{\ . ds -- \(*W- . ds PI pi . if (\n(.H=4u)&(1m=24u) .ds -- \(*W\h'-12u'\(*W\h'-12u'-\" diablo 10 pitch . if (\n(.H=4u)&(1m=20u) .ds -- \(*W\h'-12u'\(*W\h'-8u'-\" diablo 12 pitch . ds L" "" . ds R" "" . ds C` "" . ds C' "" 'br\} .el\{\ . ds -- \|\(em\| . ds PI \(*p . ds L" `` . ds R" '' . ds C` . ds C' 'br\} .\" .\" Escape single quotes in literal strings from groff's Unicode transform. .ie \n(.g .ds Aq \(aq .el .ds Aq ' .\" .\" If the F register is >0, we'll generate index entries on stderr for .\" titles (.TH), headers (.SH), subsections (.SS), items (.Ip), and index .\" entries marked with X<> in POD. Of course, you'll have to process the .\" output yourself in some meaningful fashion. .\" .\" Avoid warning from groff about undefined register 'F'. .de IX .. .nr rF 0 .if \n(.g .if rF .nr rF 1 .if (\n(rF:(\n(.g==0)) \{\ . if \nF \{\ . de IX . tm Index:\\$1\t\\n%\t"\\$2" .. . if !\nF==2 \{\ . nr % 0 . nr F 2 . \} . \} .\} .rr rF .\" ======================================================================== .\" .IX Title "MCE::Queue 3pm" .TH MCE::Queue 3pm "2019-01-30" "perl v5.28.1" "User Contributed Perl Documentation" .\" For nroff, turn off justification. Always turn off hyphenation; it makes .\" way too many mistakes in technical documents. .if n .ad l .nh .SH "NAME" MCE::Queue \- Hybrid (normal and priority) queues .SH "VERSION" .IX Header "VERSION" This document describes MCE::Queue version 1.838 .SH "SYNOPSIS" .IX Header "SYNOPSIS" .Vb 2 \& use MCE; \& use MCE::Queue; \& \& my $q = MCE::Queue\->new; \& \& $q\->enqueue( qw/ wherefore art thou romeo / ); \& \& my $item = $q\->dequeue; \& \& if ( $q\->pending ) { \& ; \& } .Ve .SH "DESCRIPTION" .IX Header "DESCRIPTION" This module provides a queue interface supporting normal and priority queues and utilizing the \s-1IPC\s0 engine behind \s-1MCE.\s0 Data resides under the manager process. Three options are available for overriding the default value for new queues. The porder option applies to priority queues only. .PP .Vb 3 \& use MCE::Queue porder => $MCE::Queue::HIGHEST, \& type => $MCE::Queue::FIFO, \& fast => 0; \& \& use MCE::Queue; # Same as above \& \& ## Possible values \& \& porder => $MCE::Queue::HIGHEST # Highest priority items dequeue first \& $MCE::Queue::LOWEST # Lowest priority items dequeue first \& \& type => $MCE::Queue::FIFO # First in, first out \& $MCE::Queue::LIFO # Last in, first out \& $MCE::Queue::LILO # (Synonym for FIFO) \& $MCE::Queue::FILO # (Synonym for LIFO) .Ve .SH "DEMONSTRATION" .IX Header "DEMONSTRATION" MCE::Queue provides two run modes. .PP (A) The \f(CW\*(C`MCE::Queue\*(C'\fR object is constructed before running \s-1MCE.\s0 The data resides under the manager process. Workers send and request data via \s-1IPC.\s0 .PP (B) Workers might want to construct a queue for local access. In this mode, the data resides under the worker process and not available to other workers including the manager process. .PP .Vb 2 \& use MCE; \& use MCE::Queue; \& \& my $F = MCE::Queue\->new( fast => 1 ); \& my $consumers = 8; \& \& my $mce = MCE\->new( \& \& task_end => sub { \& my ($mce, $task_id, $task_name) = @_; \& $F\->end() if $task_name eq \*(Aqdir\*(Aq; \& }, \& \& user_tasks => [{ \& max_workers => 1, task_name => \*(Aqdir\*(Aq, \& \& user_func => sub { \& ## Create a "standalone queue" only accessible to this worker. \& my $D = MCE::Queue\->new(queue => [ MCE\->user_args\->[0] ]); \& \& while (defined (my $dir = $D\->dequeue_nb)) { \& my (@files, @dirs); foreach (glob("$dir/*")) { \& if (\-d $_) { push @dirs, $_; next; } \& push @files, $_; \& } \& $D\->enqueue(@dirs ) if scalar @dirs; \& $F\->enqueue(@files) if scalar @files; \& } \& } \& },{ \& max_workers => $consumers, task_name => \*(Aqfile\*(Aq, \& \& user_func => sub { \& while (defined (my $file = $F\->dequeue)) { \& MCE\->say($file); \& } \& } \& }] \& \& )\->run({ user_args => [ $ARGV[0] || \*(Aq.\*(Aq ] }); \& \& _\|_END_\|_ \& \& Results taken from files_mce.pl and files_thr.pl on the web. \& https://github.com/marioroy/mce\-examples/tree/master/other \& \& Usage: \& time ./files_mce.pl /usr 0 | wc \-l \& time ./files_mce.pl /usr 1 | wc \-l \& time ./files_thr.pl /usr | wc \-l \& \& Darwin (OS) /usr: 216,271 files \& MCE::Queue, fast => 0 : 4.17s \& MCE::Queue, fast => 1 : 2.62s \& Thread::Queue : 4.14s \& \& Linux (VM) /usr: 186,154 files \& MCE::Queue, fast => 0 : 12.57s \& MCE::Queue, fast => 1 : 3.36s \& Thread::Queue : 5.91s \& \& Solaris (VM) /usr: 603,051 files \& MCE::Queue, fast => 0 : 39.04s \& MCE::Queue, fast => 1 : 18.08s \& Thread::Queue * Perl not built to support threads .Ve .SH "API DOCUMENTATION" .IX Header "API DOCUMENTATION" .SS "MCE::Queue\->new ( [ queue => \e@array, await => 1, fast => 1 ] )" .IX Subsection "MCE::Queue->new ( [ queue => @array, await => 1, fast => 1 ] )" This creates a new queue. Available options are queue, porder, type, await, barrier, fast, and gather. .PP .Vb 2 \& use MCE; \& use MCE::Queue; \& \& my $q1 = MCE::Queue\->new(); \& my $q2 = MCE::Queue\->new( queue => [ 0, 1, 2 ] ); \& \& my $q3 = MCE::Queue\->new( porder => $MCE::Queue::HIGHEST ); \& my $q4 = MCE::Queue\->new( porder => $MCE::Queue::LOWEST ); \& \& my $q5 = MCE::Queue\->new( type => $MCE::Queue::FIFO ); \& my $q6 = MCE::Queue\->new( type => $MCE::Queue::LIFO ); \& \& my $q7 = MCE::Queue\->new( await => 1, barrier => 0 ); \& my $q8 = MCE::Queue\->new( fast => 1 ); .Ve .PP The \f(CW\*(C`await\*(C'\fR option, when enabled, allows workers to block (semaphore-like) until the number of items pending is equal to or less than a threshold value. The \f(CW$q\fR\->await method is described below. .PP On Unix platforms, \f(CW\*(C`barrier\*(C'\fR mode (enabled by default) prevents many workers from dequeuing simultaneously to lessen overhead for the \s-1OS\s0 kernel. Specify 0 to disable barrier mode and not allocate sockets. The barrier option has no effect if constructing the queue inside a thread or enabling \f(CW\*(C`fast\*(C'\fR. .PP The \f(CW\*(C`fast\*(C'\fR option speeds up dequeues and is not enabled by default. It is beneficial for queues not calling (\->dequeue_nb) and not altering the count value while running; e.g. \->dequeue($count). .PP The \f(CW\*(C`gather\*(C'\fR option is mainly for running with \s-1MCE\s0 and wanting to pass item(s) to a callback function for appending to the queue. Multiple queues may point to the same callback function. The callback receives the queue object as the first argument and items after it. .PP .Vb 4 \& sub _append { \& my ($q, @items) = @_; \& $q\->enqueue(@items); \& } \& \& my $q7 = MCE::Queue\->new( gather => \e&_append ); \& my $q8 = MCE::Queue\->new( gather => \e&_append ); \& \& ## Items are diverted to the callback function, not the queue. \& $q7\->enqueue( \*(Aqapple\*(Aq, \*(Aqorange\*(Aq ); .Ve .PP Specifying the \f(CW\*(C`gather\*(C'\fR option allows one to store items temporarily while ensuring output order. Although a queue object is not required, this is simply a demonstration of the gather option in the context of a queue. .PP .Vb 2 \& use MCE; \& use MCE::Queue; \& \& sub preserve_order { \& my %tmp; my $order_id = 1; \& \& return sub { \& my ($q, $chunk_id, $data) = @_; \& $tmp{$chunk_id} = $data; \& \& while (1) { \& last unless exists $tmp{$order_id}; \& $q\->enqueue( delete $tmp{$order_id++} ); \& } \& \& return; \& }; \& } \& \& my @squares; my $q = MCE::Queue\->new( \& queue => \e@squares, gather => preserve_order \& ); \& \& my $mce = MCE\->new( \& chunk_size => 1, input_data => [ 1 .. 100 ], \& user_func => sub { \& $q\->enqueue( MCE\->chunk_id, $_ * $_ ); \& } \& ); \& \& $mce\->run; \& \& print "@squares\en"; .Ve .ie n .SS "$q\->await ( $pending_threshold )" .el .SS "\f(CW$q\fP\->await ( \f(CW$pending_threshold\fP )" .IX Subsection "$q->await ( $pending_threshold )" The await method is beneficial when wanting to throttle worker(s) appending to the queue. Perhaps, consumers are running a bit behind and wanting to keep tabs on memory consumption. Below, the number of items pending will never go above 20. .PP .Vb 1 \& use Time::HiRes qw( sleep ); \& \& use MCE::Flow; \& use MCE::Queue; \& \& my $q = MCE::Queue\->new( await => 1, fast => 1 ); \& my ( $producers, $consumers ) = ( 1, 8 ); \& \& mce_flow { \& task_name => [ \*(Aqproducer\*(Aq, \*(Aqconsumer\*(Aq ], \& max_workers => [ $producers, $consumers ], \& }, \& sub { \& ## producer \& for my $item ( 1 .. 100 ) { \& $q\->enqueue($item); \& \& ## blocks until the # of items pending reaches <= 10 \& if ($item % 10 == 0) { \& MCE\->say( \*(Aqpending: \*(Aq.$q\->pending() ); \& $q\->await(10); \& } \& } \& \& ## notify consumers no more work \& $q\->end(); \& \& }, \& sub { \& ## consumers \& while (defined (my $next = $q\->dequeue())) { \& MCE\->say( MCE\->task_wid().\*(Aq: \*(Aq.$next ); \& sleep 0.100; \& } \& }; .Ve .ie n .SS "$q\->clear ( void )" .el .SS "\f(CW$q\fP\->clear ( void )" .IX Subsection "$q->clear ( void )" Clears the queue of any items. This has the effect of nulling the queue and the socket used for blocking. .PP .Vb 1 \& my @a; my $q = MCE::Queue\->new( queue => \e@a ); \& \& @a = (); ## bad, the blocking socket may become out of sync \& $q\->clear; ## ok .Ve .ie n .SS "$q\->end ( void )" .el .SS "\f(CW$q\fP\->end ( void )" .IX Subsection "$q->end ( void )" Stops the queue from receiving more items. Any worker blocking on \f(CW\*(C`dequeue\*(C'\fR will be unblocked automatically. Subsequent calls to \f(CW\*(C`dequeue\*(C'\fR will behave like \f(CW\*(C`dequeue_nb\*(C'\fR. Current \s-1API\s0 available since \s-1MCE 1.818.\s0 .PP .Vb 1 \& $q\->end(); .Ve .PP \&\s-1MCE\s0 Models (e.g. MCE::Flow) may persist between runs. In that case, one might want to enqueue \f(CW\*(C`undef\*(C'\fR's versus calling \f(CW\*(C`end\*(C'\fR. The number of \f(CW\*(C`undef\*(C'\fR's depends on how many items workers dequeue at a time. .PP .Vb 3 \& $q\->enqueue((undef) x ($N_workers * 1)); # $q\->dequeue() 1 item \& $q\->enqueue((undef) x ($N_workers * 2)); # $q\->dequeue(2) 2 items \& $q\->enqueue((undef) x ($N_workers * N)); # $q\->dequeue(N) N items .Ve .ie n .SS "$q\->enqueue ( $item [, $item, ... ] )" .el .SS "\f(CW$q\fP\->enqueue ( \f(CW$item\fP [, \f(CW$item\fP, ... ] )" .IX Subsection "$q->enqueue ( $item [, $item, ... ] )" Appends a list of items onto the end of the normal queue. .PP .Vb 2 \& $q\->enqueue( \*(Aqfoo\*(Aq ); \& $q\->enqueue( \*(Aqbar\*(Aq, \*(Aqbaz\*(Aq ); .Ve .ie n .SS "$q\->enqueuep ( $p, $item [, $item, ... ] )" .el .SS "\f(CW$q\fP\->enqueuep ( \f(CW$p\fP, \f(CW$item\fP [, \f(CW$item\fP, ... ] )" .IX Subsection "$q->enqueuep ( $p, $item [, $item, ... ] )" Appends a list of items onto the end of the priority queue with priority. .PP .Vb 2 \& $q\->enqueue( $priority, \*(Aqfoo\*(Aq ); \& $q\->enqueue( $priority, \*(Aqbar\*(Aq, \*(Aqbaz\*(Aq ); .Ve .ie n .SS "$q\->dequeue ( [ $count ] )" .el .SS "\f(CW$q\fP\->dequeue ( [ \f(CW$count\fP ] )" .IX Subsection "$q->dequeue ( [ $count ] )" Returns the requested number of items (default 1) from the queue. Priority data will always dequeue first before any data from the normal queue. .PP .Vb 2 \& $q\->dequeue( 2 ); \& $q\->dequeue; # default 1 .Ve .PP The method will block if the queue contains zero items. If the queue contains fewer than the requested number of items, the method will not block, but return whatever items there are on the queue. .PP The \f(CW$count\fR, used for requesting the number of items, is beneficial when workers are passing parameters through the queue. For this reason, always remember to dequeue using the same multiple for the count. This is unlike Thread::Queue which will block until the requested number of items are available. .PP .Vb 5 \& # MCE::Queue 1.820 and prior releases \& while ( my @items = $q\->dequeue(2) ) { \& last unless ( defined $items[0] ); \& ... \& } \& \& # MCE::Queue 1.821 and later \& while ( my @items = $q\->dequeue(2) ) { \& ... \& } .Ve .ie n .SS "$q\->dequeue_nb ( [ $count ] )" .el .SS "\f(CW$q\fP\->dequeue_nb ( [ \f(CW$count\fP ] )" .IX Subsection "$q->dequeue_nb ( [ $count ] )" Returns the requested number of items (default 1) from the queue. Like with dequeue, priority data will always dequeue first. This method is non-blocking and returns \f(CW\*(C`undef\*(C'\fR in the absence of data. .PP .Vb 2 \& $q\->dequeue_nb( 2 ); \& $q\->dequeue_nb; # default 1 .Ve .ie n .SS "$q\->insert ( $index, $item [, $item, ... ] )" .el .SS "\f(CW$q\fP\->insert ( \f(CW$index\fP, \f(CW$item\fP [, \f(CW$item\fP, ... ] )" .IX Subsection "$q->insert ( $index, $item [, $item, ... ] )" Adds the list of items to the queue at the specified index position (0 is the head of the list). The head of the queue is that item which would be removed by a call to dequeue. .PP .Vb 4 \& $q = MCE::Queue\->new( type => $MCE::Queue::FIFO ); \& $q\->enqueue(1, 2, 3, 4); \& $q\->insert(1, \*(Aqfoo\*(Aq, \*(Aqbar\*(Aq); \& # Queue now contains: 1, foo, bar, 2, 3, 4 \& \& $q = MCE::Queue\->new( type => $MCE::Queue::LIFO ); \& $q\->enqueue(1, 2, 3, 4); \& $q\->insert(1, \*(Aqfoo\*(Aq, \*(Aqbar\*(Aq); \& # Queue now contains: 1, 2, 3, \*(Aqfoo\*(Aq, \*(Aqbar\*(Aq, 4 .Ve .ie n .SS "$q\->insertp ( $p, $index, $item [, $item, ... ] )" .el .SS "\f(CW$q\fP\->insertp ( \f(CW$p\fP, \f(CW$index\fP, \f(CW$item\fP [, \f(CW$item\fP, ... ] )" .IX Subsection "$q->insertp ( $p, $index, $item [, $item, ... ] )" Adds the list of items to the queue at the specified index position with priority. The behavior is similarly to \f(CW\*(C`$q\-\*(C'\fRinsert> otherwise. .ie n .SS "$q\->pending ( void )" .el .SS "\f(CW$q\fP\->pending ( void )" .IX Subsection "$q->pending ( void )" Returns the number of items in the queue. The count includes both normal and priority data. Returns \f(CW\*(C`undef\*(C'\fR if the queue has been ended, and there are no more items in the queue. .PP .Vb 3 \& $q = MCE::Queue\->new(); \& $q\->enqueuep(5, \*(Aqfoo\*(Aq, \*(Aqbar\*(Aq); \& $q\->enqueue(\*(Aqsunny\*(Aq, \*(Aqday\*(Aq); \& \& print $q\->pending(), "\en"; \& # Output: 4 .Ve .ie n .SS "$q\->peek ( [ $index ] )" .el .SS "\f(CW$q\fP\->peek ( [ \f(CW$index\fP ] )" .IX Subsection "$q->peek ( [ $index ] )" Returns an item from the normal queue, at the specified index, without dequeuing anything. It defaults to the head of the queue if index is not specified. The head of the queue is that item which would be removed by a call to dequeue. Negative index values are supported, similarly to arrays. .PP .Vb 2 \& $q = MCE::Queue\->new( type => $MCE::Queue::FIFO ); \& $q\->enqueue(1, 2, 3, 4, 5); \& \& print $q\->peek(1), \*(Aq \*(Aq, $q\->peek(\-2), "\en"; \& # Output: 2 4 \& \& $q = MCE::Queue\->new( type => $MCE::Queue::LIFO ); \& $q\->enqueue(1, 2, 3, 4, 5); \& \& print $q\->peek(1), \*(Aq \*(Aq, $q\->peek(\-2), "\en"; \& # Output: 4 2 .Ve .ie n .SS "$q\->peekp ( $p [, $index ] )" .el .SS "\f(CW$q\fP\->peekp ( \f(CW$p\fP [, \f(CW$index\fP ] )" .IX Subsection "$q->peekp ( $p [, $index ] )" Returns an item from the queue with priority, at the specified index, without dequeuing anything. It defaults to the head of the queue if index is not specified. The behavior is similarly to \f(CW\*(C`$q\-\*(C'\fRpeek> otherwise. .ie n .SS "$q\->peekh ( [ $index ] )" .el .SS "\f(CW$q\fP\->peekh ( [ \f(CW$index\fP ] )" .IX Subsection "$q->peekh ( [ $index ] )" Returns an item from the head of the heap or at the specified index. .PP .Vb 4 \& $q = MCE::Queue\->new( porder => $MCE::Queue::HIGHEST ); \& $q\->enqueuep(5, \*(Aqfoo\*(Aq); \& $q\->enqueuep(6, \*(Aqbar\*(Aq); \& $q\->enqueuep(4, \*(Aqsun\*(Aq); \& \& print $q\->peekh(0), "\en"; \& # Output: 6 \& \& $q = MCE::Queue\->new( porder => $MCE::Queue::LOWEST ); \& $q\->enqueuep(5, \*(Aqfoo\*(Aq); \& $q\->enqueuep(6, \*(Aqbar\*(Aq); \& $q\->enqueuep(4, \*(Aqsun\*(Aq); \& \& print $q\->peekh(0), "\en"; \& # Output: 4 .Ve .ie n .SS "$q\->heap ( void )" .el .SS "\f(CW$q\fP\->heap ( void )" .IX Subsection "$q->heap ( void )" Returns an array containing the heap data. Heap data consists of priority numbers, not the data. .PP .Vb 2 \& @h = $q\->heap; # $MCE::Queue::HIGHEST \& # Heap contains: 6, 5, 4 \& \& @h = $q\->heap; # $MCE::Queue::LOWEST \& # Heap contains: 4, 5, 6 .Ve .SH "ACKNOWLEDGMENTS" .IX Header "ACKNOWLEDGMENTS" .IP "List::BinarySearch" 3 .IX Item "List::BinarySearch" The bsearch_num_pos method was helpful for accommodating the highest and lowest order in MCE::Queue. .IP "POE::Queue::Array" 3 .IX Item "POE::Queue::Array" For extra optimization, two if statements were adopted for checking if the item belongs at the end or head of the queue. .IP "List::Priority" 3 .IX Item "List::Priority" MCE::Queue supports both normal and priority queues. .IP "Thread::Queue" 3 .IX Item "Thread::Queue" Thread::Queue is used as a template for identifying and documenting the methods. .Sp MCE::Queue is not fully compatible due to supporting normal and priority queues simultaneously; e.g. .Sp .Vb 2 \& $q\->enqueue( $item [, $item, ... ] ); # normal queue \& $q\->enqueuep( $p, $item [, $item, ... ] ); # priority queue \& \& $q\->dequeue( [ $count ] ); # priority data dequeues first \& $q\->dequeue_nb( [ $count ] ); \& \& $q\->pending(); # counts both normal/priority queues .Ve .IP "Parallel::DataPipe" 3 .IX Item "Parallel::DataPipe" The recursion example, in the synopsis above, was largely adopted from this module. .SH "INDEX" .IX Header "INDEX" \&\s-1MCE\s0, MCE::Core .SH "AUTHOR" .IX Header "AUTHOR" Mario E. Roy,