.\" Automatically generated by Pod::Man 4.14 (Pod::Simple 3.43) .\" .\" Standard preamble: .\" ======================================================================== .de Sp \" Vertical space (when we can't use .PP) .if t .sp .5v .if n .sp .. .de Vb \" Begin verbatim text .ft CW .nf .ne \\$1 .. .de Ve \" End verbatim text .ft R .fi .. .\" Set up some character translations and predefined strings. \*(-- will .\" give an unbreakable dash, \*(PI will give pi, \*(L" will give a left .\" double quote, and \*(R" will give a right double quote. \*(C+ will .\" give a nicer C++. Capital omega is used to do unbreakable dashes and .\" therefore won't be available. \*(C` and \*(C' expand to `' in nroff, .\" nothing in troff, for use with C<>. .tr \(*W- .ds C+ C\v'-.1v'\h'-1p'\s-2+\h'-1p'+\s0\v'.1v'\h'-1p' .ie n \{\ . ds -- \(*W- . ds PI pi . if (\n(.H=4u)&(1m=24u) .ds -- \(*W\h'-12u'\(*W\h'-12u'-\" diablo 10 pitch . if (\n(.H=4u)&(1m=20u) .ds -- \(*W\h'-12u'\(*W\h'-8u'-\" diablo 12 pitch . ds L" "" . ds R" "" . ds C` "" . ds C' "" 'br\} .el\{\ . ds -- \|\(em\| . ds PI \(*p . ds L" `` . ds R" '' . ds C` . ds C' 'br\} .\" .\" Escape single quotes in literal strings from groff's Unicode transform. .ie \n(.g .ds Aq \(aq .el .ds Aq ' .\" .\" If the F register is >0, we'll generate index entries on stderr for .\" titles (.TH), headers (.SH), subsections (.SS), items (.Ip), and index .\" entries marked with X<> in POD. Of course, you'll have to process the .\" output yourself in some meaningful fashion. .\" .\" Avoid warning from groff about undefined register 'F'. .de IX .. .nr rF 0 .if \n(.g .if rF .nr rF 1 .if (\n(rF:(\n(.g==0)) \{\ . if \nF \{\ . de IX . tm Index:\\$1\t\\n%\t"\\$2" .. . if !\nF==2 \{\ . nr % 0 . nr F 2 . \} . \} .\} .rr rF .\" ======================================================================== .\" .IX Title "MCE::Stream 3pm" .TH MCE::Stream 3pm "2023-09-29" "perl v5.36.0" "User Contributed Perl Documentation" .\" For nroff, turn off justification. Always turn off hyphenation; it makes .\" way too many mistakes in technical documents. .if n .ad l .nh .SH "NAME" MCE::Stream \- Parallel stream model for chaining multiple maps and greps .SH "VERSION" .IX Header "VERSION" This document describes MCE::Stream version 1.889 .SH "SYNOPSIS" .IX Header "SYNOPSIS" .Vb 2 \& ## Exports mce_stream, mce_stream_f, mce_stream_s \& use MCE::Stream; \& \& my (@m1, @m2, @m3); \& \& ## Default mode is map and processed from right\-to\-left \& @m1 = mce_stream sub { $_ * 3 }, sub { $_ * 2 }, 1..10000; \& mce_stream \e@m2, sub { $_ * 3 }, sub { $_ * 2 }, 1..10000; \& \& ## Native Perl \& @m3 = map { $_ * $_ } grep { $_ % 5 == 0 } 1..10000; \& \& ## Streaming grep and map in parallel \& mce_stream \e@m3, \& { mode => \*(Aqmap\*(Aq, code => sub { $_ * $_ } }, \& { mode => \*(Aqgrep\*(Aq, code => sub { $_ % 5 == 0 } }, 1..10000; \& \& ## Array or array_ref \& my @a = mce_stream sub { $_ * $_ }, 1..10000; \& my @b = mce_stream sub { $_ * $_ }, \e@list; \& \& ## Important; pass an array_ref for deeply input data \& my @c = mce_stream sub { $_\->[1] *= 2; $_ }, [ [ 0, 1 ], [ 0, 2 ], ... ]; \& my @d = mce_stream sub { $_\->[1] *= 2; $_ }, \e@deeply_list; \& \& ## File path, glob ref, IO::All::{ File, Pipe, STDIO } obj, or scalar ref \& ## Workers read directly and not involve the manager process \& my @e = mce_stream_f sub { chomp; $_ }, "/path/to/file"; # efficient \& \& ## Involves the manager process, therefore slower \& my @f = mce_stream_f sub { chomp; $_ }, $file_handle; \& my @g = mce_stream_f sub { chomp; $_ }, $io; \& my @h = mce_stream_f sub { chomp; $_ }, \e$scalar; \& \& ## Sequence of numbers (begin, end [, step, format]) \& my @i = mce_stream_s sub { $_ * $_ }, 1, 10000, 5; \& my @j = mce_stream_s sub { $_ * $_ }, [ 1, 10000, 5 ]; \& \& my @k = mce_stream_s sub { $_ * $_ }, { \& begin => 1, end => 10000, step => 5, format => undef \& }; .Ve .SH "DESCRIPTION" .IX Header "DESCRIPTION" This module allows one to stream multiple map and/or grep operations in parallel. Code blocks run simultaneously from right-to-left. The results are appended immediately when providing a reference to an array. .PP .Vb 3 \& ## Appends are serialized, even out\-of\-order ok, but immediately. \& ## Out\-of\-order chunks are held temporarily until ordered chunks \& ## arrive. \& \& mce_stream \e@a, sub { $_ }, sub { $_ }, sub { $_ }, 1..10000; \& \& ## input \& ## chunk1 input \& ## chunk3 chunk2 input \& ## chunk2 chunk2 chunk3 input \& ## append1 chunk3 chunk1 chunk4 input \& ## append2 chunk1 chunk5 chunk5 input \& ## append3 chunk5 chunk4 chunk6 ... \& ## append4 chunk4 chunk6 ... \& ## append5 chunk6 ... \& ## append6 ... \& ## ... \& ## .Ve .PP \&\s-1MCE\s0 incurs a small overhead due to passing of data. A fast code block will run faster natively when chaining multiple map functions. However, the overhead will likely diminish as the complexity increases for the code. .PP .Vb 2 \& ## 0.334 secs \-\- baseline using the native map function \& my @m1 = map { $_ * 4 } map { $_ * 3 } map { $_ * 2 } 1..1000000; \& \& ## 0.427 secs \-\- this is quite amazing considering data passing \& my @m2 = mce_stream \& sub { $_ * 4 }, sub { $_ * 3 }, sub { $_ * 2 }, 1..1000000; \& \& ## 0.355 secs \-\- appends to @m3 immediately, not after running \& my @m3; mce_stream \e@m3, \& sub { $_ * 4 }, sub { $_ * 3 }, sub { $_ * 2 }, 1..1000000; .Ve .PP Even faster is mce_stream_s; useful when input data is a range of numbers. Workers generate sequences mathematically among themselves without any interaction from the manager process. Two arguments are required for mce_stream_s (begin, end). Step defaults to 1 if begin is smaller than end, otherwise \-1. .PP .Vb 3 \& ## 0.278 secs \-\- numbers are generated mathematically via sequence \& my @m4; mce_stream_s \e@m4, \& sub { $_ * 4 }, sub { $_ * 3 }, sub { $_ * 2 }, 1, 1000000; .Ve .SH "OVERRIDING DEFAULTS" .IX Header "OVERRIDING DEFAULTS" The following list options which may be overridden when loading the module. The fast option is obsolete in 1.867 onwards; ignored if specified. .PP .Vb 3 \& use Sereal qw( encode_sereal decode_sereal ); \& use CBOR::XS qw( encode_cbor decode_cbor ); \& use JSON::XS qw( encode_json decode_json ); \& \& use MCE::Stream \& max_workers => 8, # Default \*(Aqauto\*(Aq \& chunk_size => 500, # Default \*(Aqauto\*(Aq \& tmp_dir => "/path/to/app/tmp", # $MCE::Signal::tmp_dir \& freeze => \e&encode_sereal, # \e&Storable::freeze \& thaw => \e&decode_sereal, # \e&Storable::thaw \& init_relay => 0, # Default undef; MCE 1.882+ \& use_threads => 0, # Default undef; MCE 1.882+ \& default_mode => \*(Aqgrep\*(Aq, # Default \*(Aqmap\*(Aq \& ; .Ve .PP From \s-1MCE 1.8\s0 onwards, Sereal 3.015+ is loaded automatically if available. Specify \f(CW\*(C`Sereal => 0\*(C'\fR to use Storable instead. .PP .Vb 1 \& use MCE::Stream Sereal => 0; .Ve .SH "CUSTOMIZING MCE" .IX Header "CUSTOMIZING MCE" .IP "MCE::Stream\->init ( options )" 3 .IX Item "MCE::Stream->init ( options )" .PD 0 .IP "MCE::Stream::init { options }" 3 .IX Item "MCE::Stream::init { options }" .PD .PP The init function accepts a hash of \s-1MCE\s0 options. The gather and bounds_only options, if specified, are ignored due to being used internally by the module (not shown below). .PP .Vb 1 \& use MCE::Stream; \& \& MCE::Stream\->init( \& chunk_size => 1, max_workers => 4, \& \& user_begin => sub { \& print "## ", MCE\->wid, " started\en"; \& }, \& \& user_end => sub { \& print "## ", MCE\->wid, " completed\en"; \& } \& ); \& \& my @a = mce_stream sub { $_ * $_ }, 1..100; \& \& print "\en", "@a", "\en"; \& \& \-\- Output \& \& ## 1 started \& ## 2 started \& ## 3 started \& ## 4 started \& ## 3 completed \& ## 1 completed \& ## 2 completed \& ## 4 completed \& \& 1 4 9 16 25 36 49 64 81 100 121 144 169 196 225 256 289 324 361 \& 400 441 484 529 576 625 676 729 784 841 900 961 1024 1089 1156 \& 1225 1296 1369 1444 1521 1600 1681 1764 1849 1936 2025 2116 2209 \& 2304 2401 2500 2601 2704 2809 2916 3025 3136 3249 3364 3481 3600 \& 3721 3844 3969 4096 4225 4356 4489 4624 4761 4900 5041 5184 5329 \& 5476 5625 5776 5929 6084 6241 6400 6561 6724 6889 7056 7225 7396 \& 7569 7744 7921 8100 8281 8464 8649 8836 9025 9216 9409 9604 9801 \& 10000 .Ve .PP Like with MCE::Stream\->init above, \s-1MCE\s0 options may be specified using an anonymous hash for the first argument. Notice how both max_workers and task_name can take an anonymous array for setting values uniquely per each code block. .PP Remember that MCE::Stream processes from right-to-left when setting the individual values. .PP .Vb 1 \& use MCE::Stream; \& \& my @a = mce_stream { \& task_name => [ \*(Aqc\*(Aq, \*(Aqb\*(Aq, \*(Aqa\*(Aq ], \& max_workers => [ 2, 4, 3, ], \& \& user_end => sub { \& my ($mce, $task_id, $task_name) = @_; \& print "$task_id \- $task_name completed\en"; \& }, \& \& task_end => sub { \& my ($mce, $task_id, $task_name) = @_; \& MCE\->print("$task_id \- $task_name ended\en"); \& } \& }, \& sub { $_ * 4 }, ## 2 workers, named c \& sub { $_ * 3 }, ## 4 workers, named b \& sub { $_ * 2 }, 1..10000; ## 3 workers, named a \& \& \-\- Output \& \& 0 \- a completed \& 0 \- a completed \& 0 \- a completed \& 0 \- a ended \& 1 \- b completed \& 1 \- b completed \& 1 \- b completed \& 1 \- b completed \& 1 \- b ended \& 2 \- c completed \& 2 \- c completed \& 2 \- c ended .Ve .PP Note that the anonymous hash, for specifying options, also comes first when passing an array reference. .PP .Vb 3 \& my @a; mce_stream { \& ... \& }, \e@a, sub { ... }, sub { ... }, 1..10000; .Ve .SH "API DOCUMENTATION" .IX Header "API DOCUMENTATION" Scripts using MCE::Stream can be written using the long or short form. The long form becomes relevant when mixing modes. Again, processing occurs from right-to-left. .PP .Vb 3 \& my @m3 = mce_stream \& { mode => \*(Aqmap\*(Aq, code => sub { $_ * $_ } }, \& { mode => \*(Aqgrep\*(Aq, code => sub { $_ % 5 == 0 } }, 1..10000; \& \& my @m4; mce_stream \e@m4, \& { mode => \*(Aqmap\*(Aq, code => sub { $_ * $_ } }, \& { mode => \*(Aqgrep\*(Aq, code => sub { $_ % 5 == 0 } }, 1..10000; .Ve .PP For multiple grep blocks, the short form can be used. Simply specify the default mode for the module. The two valid values for default_mode is 'grep' and 'map'. .PP .Vb 1 \& use MCE::Stream default_mode => \*(Aqgrep\*(Aq; \& \& my @f = mce_stream_f sub { /ending$/ }, sub { /^starting/ }, $file; .Ve .PP The following assumes 'map' for default_mode in order to demonstrate all the possibilities for providing input data. .IP "MCE::Stream\->run ( sub { code }, list )" 3 .IX Item "MCE::Stream->run ( sub { code }, list )" .PD 0 .IP "mce_stream sub { code }, list" 3 .IX Item "mce_stream sub { code }, list" .PD .PP Input data may be defined using a list or an array reference. Unlike MCE::Loop, Flow, and Step, specifying a hash reference as input data isn't allowed. .PP .Vb 3 \& ## Array or array_ref \& my @a = mce_stream sub { $_ * 2 }, 1..1000; \& my @b = mce_stream sub { $_ * 2 }, \e@list; \& \& ## Important; pass an array_ref for deeply input data \& my @c = mce_stream sub { $_\->[1] *= 2; $_ }, [ [ 0, 1 ], [ 0, 2 ], ... ]; \& my @d = mce_stream sub { $_\->[1] *= 2; $_ }, \e@deeply_list; \& \& ## Not supported \& my @z = mce_stream sub { ... }, \e%hash; .Ve .IP "MCE::Stream\->run_file ( sub { code }, file )" 3 .IX Item "MCE::Stream->run_file ( sub { code }, file )" .PD 0 .IP "mce_stream_f sub { code }, file" 3 .IX Item "mce_stream_f sub { code }, file" .PD .PP The fastest of these is the /path/to/file. Workers communicate the next offset position among themselves with zero interaction by the manager process. .PP \&\f(CW\*(C`IO::All\*(C'\fR { File, Pipe, \s-1STDIO\s0 } is supported since \s-1MCE 1.845.\s0 .PP .Vb 4 \& my @c = mce_stream_f sub { chomp; $_ . "\er\en" }, "/path/to/file"; # faster \& my @d = mce_stream_f sub { chomp; $_ . "\er\en" }, $file_handle; \& my @e = mce_stream_f sub { chomp; $_ . "\er\en" }, $io; # IO::All \& my @f = mce_stream_f sub { chomp; $_ . "\er\en" }, \e$scalar; .Ve .ie n .IP "MCE::Stream\->run_seq ( sub { code }, $beg, $end [, $step, $fmt ] )" 3 .el .IP "MCE::Stream\->run_seq ( sub { code }, \f(CW$beg\fR, \f(CW$end\fR [, \f(CW$step\fR, \f(CW$fmt\fR ] )" 3 .IX Item "MCE::Stream->run_seq ( sub { code }, $beg, $end [, $step, $fmt ] )" .PD 0 .ie n .IP "mce_stream_s sub { code }, $beg, $end [, $step, $fmt ]" 3 .el .IP "mce_stream_s sub { code }, \f(CW$beg\fR, \f(CW$end\fR [, \f(CW$step\fR, \f(CW$fmt\fR ]" 3 .IX Item "mce_stream_s sub { code }, $beg, $end [, $step, $fmt ]" .PD .PP Sequence may be defined as a list, an array reference, or a hash reference. The functions require both begin and end values to run. Step and format are optional. The format is passed to sprintf (% may be omitted below). .PP .Vb 1 \& my ($beg, $end, $step, $fmt) = (10, 20, 0.1, "%4.1f"); \& \& my @f = mce_stream_s sub { $_ }, $beg, $end, $step, $fmt; \& my @g = mce_stream_s sub { $_ }, [ $beg, $end, $step, $fmt ]; \& \& my @h = mce_stream_s sub { $_ }, { \& begin => $beg, end => $end, step => $step, format => $fmt \& }; .Ve .IP "MCE::Stream\->run ( { input_data => iterator }, sub { code } )" 3 .IX Item "MCE::Stream->run ( { input_data => iterator }, sub { code } )" .PD 0 .IP "mce_stream { input_data => iterator }, sub { code }" 3 .IX Item "mce_stream { input_data => iterator }, sub { code }" .PD .PP An iterator reference may be specified for input_data. The only other way is to specify input_data via MCE::Stream\->init. This prevents MCE::Stream from configuring the iterator reference as another user task which will not work. .PP Iterators are described under section \*(L"\s-1SYNTAX\s0 for \s-1INPUT_DATA\*(R"\s0 at MCE::Core. .PP .Vb 3 \& MCE::Stream\->init( \& input_data => iterator \& ); \& \& my @a = mce_stream sub { $_ * 3 }, sub { $_ * 2 }; .Ve .SH "MANUAL SHUTDOWN" .IX Header "MANUAL SHUTDOWN" .IP "MCE::Stream\->finish" 3 .IX Item "MCE::Stream->finish" .PD 0 .IP "MCE::Stream::finish" 3 .IX Item "MCE::Stream::finish" .PD .PP Workers remain persistent as much as possible after running. Shutdown occurs automatically when the script terminates. Call finish when workers are no longer needed. .PP .Vb 1 \& use MCE::Stream; \& \& MCE::Stream\->init( \& chunk_size => 20, max_workers => \*(Aqauto\*(Aq \& ); \& \& my @a = mce_stream { ... } 1..100; \& \& MCE::Stream\->finish; .Ve .SH "INDEX" .IX Header "INDEX" \&\s-1MCE\s0, MCE::Core .SH "AUTHOR" .IX Header "AUTHOR" Mario E. Roy,