.\" Automatically generated by Pod::Man 4.10 (Pod::Simple 3.35) .\" .\" Standard preamble: .\" ======================================================================== .de Sp \" Vertical space (when we can't use .PP) .if t .sp .5v .if n .sp .. .de Vb \" Begin verbatim text .ft CW .nf .ne \\$1 .. .de Ve \" End verbatim text .ft R .fi .. .\" Set up some character translations and predefined strings. \*(-- will .\" give an unbreakable dash, \*(PI will give pi, \*(L" will give a left .\" double quote, and \*(R" will give a right double quote. \*(C+ will .\" give a nicer C++. Capital omega is used to do unbreakable dashes and .\" therefore won't be available. \*(C` and \*(C' expand to `' in nroff, .\" nothing in troff, for use with C<>. .tr \(*W- .ds C+ C\v'-.1v'\h'-1p'\s-2+\h'-1p'+\s0\v'.1v'\h'-1p' .ie n \{\ . ds -- \(*W- . ds PI pi . if (\n(.H=4u)&(1m=24u) .ds -- \(*W\h'-12u'\(*W\h'-12u'-\" diablo 10 pitch . if (\n(.H=4u)&(1m=20u) .ds -- \(*W\h'-12u'\(*W\h'-8u'-\" diablo 12 pitch . ds L" "" . ds R" "" . ds C` "" . ds C' "" 'br\} .el\{\ . ds -- \|\(em\| . ds PI \(*p . ds L" `` . ds R" '' . ds C` . ds C' 'br\} .\" .\" Escape single quotes in literal strings from groff's Unicode transform. .ie \n(.g .ds Aq \(aq .el .ds Aq ' .\" .\" If the F register is >0, we'll generate index entries on stderr for .\" titles (.TH), headers (.SH), subsections (.SS), items (.Ip), and index .\" entries marked with X<> in POD. Of course, you'll have to process the .\" output yourself in some meaningful fashion. .\" .\" Avoid warning from groff about undefined register 'F'. .de IX .. .nr rF 0 .if \n(.g .if rF .nr rF 1 .if (\n(rF:(\n(.g==0)) \{\ . if \nF \{\ . de IX . tm Index:\\$1\t\\n%\t"\\$2" .. . if !\nF==2 \{\ . nr % 0 . nr F 2 . \} . \} .\} .rr rF .\" ======================================================================== .\" .IX Title "MCE::Relay 3pm" .TH MCE::Relay 3pm "2019-01-30" "perl v5.28.1" "User Contributed Perl Documentation" .\" For nroff, turn off justification. Always turn off hyphenation; it makes .\" way too many mistakes in technical documents. .if n .ad l .nh .SH "NAME" MCE::Relay \- Extends Many\-Core Engine with relay capabilities .SH "VERSION" .IX Header "VERSION" This document describes MCE::Relay version 1.838 .SH "SYNOPSIS" .IX Header "SYNOPSIS" .Vb 1 \& use MCE::Flow; \& \& my $file = shift || \e*STDIN; \& \& ## Line Count ####################################### \& \& mce_flow_f { \& max_workers => 4, \& use_slurpio => 1, \& init_relay => 0, \& }, \& sub { \& my ($mce, $slurp_ref, $chunk_id) = @_; \& my $line_count = ($$slurp_ref =~ tr/\en//); \& \& ## Receive and pass on updated information. \& my $lines_read = MCE::relay { $_ += $line_count }; \& \& }, $file; \& \& my $total_lines = MCE\->relay_final; \& \& print {*STDERR} "$total_lines\en"; \& \& ## Orderly Action ################################### \& \& $| = 1; # Important, must flush output immediately. \& \& mce_flow_f { \& max_workers => 2, \& use_slurpio => 1, \& init_relay => 0, \& }, \& sub { \& my ($mce, $slurp_ref, $chunk_id) = @_; \& \& ## The relay value is relayed and remains 0. \& ## Writes to STDOUT orderly. \& \& MCE\->relay_lock; \& print $$slurp_ref; \& MCE\->relay_unlock; \& \& }, $file; .Ve .SH "DESCRIPTION" .IX Header "DESCRIPTION" This module enables workers to receive and pass on information orderly with zero involvement by the manager process while running. The module is loaded automatically when \s-1MCE\s0 option \f(CW\*(C`init_relay\*(C'\fR is specified. .PP All workers (belonging to task_id 0) must participate when relaying data. .PP Relaying is not meant for passing big data. The last worker will stall if exceeding the buffer size for the socket. Not exceeding 16 KiB \- 7 is safe across all platforms. .SH "API DOCUMENTATION" .IX Header "API DOCUMENTATION" .IP "\s-1MCE\-\s0>relay ( sub { code } )" 3 .IX Item "MCE->relay ( sub { code } )" .PD 0 .IP "MCE::relay { code }" 3 .IX Item "MCE::relay { code }" .PD Relay is enabled by specifying the init_relay option which takes a hash or array reference, or a scalar value. Relaying is orderly and driven by chunk_id when processing data, otherwise task_wid. Omitting the code block (e.g. MCE::relay) relays forward. .Sp Below, relaying multiple values via a \s-1HASH\s0 reference. .Sp .Vb 1 \& use MCE::Flow max_workers => 4; \& \& mce_flow { \& init_relay => { p => 0, e => 0 }, \& }, \& sub { \& my $wid = MCE\->wid; \& \& ## do work \& my $pass = $wid % 3; \& my $errs = $wid % 2; \& \& ## relay \& my %last_rpt = MCE::relay { $_\->{p} += $pass; $_\->{e} += $errs }; \& \& MCE\->print("$wid: passed $pass, errors $errs\en"); \& \& return; \& }; \& \& my %results = MCE\->relay_final; \& \& print " passed $results{p}, errors $results{e} final\en\en"; \& \& \-\- Output \& \& 1: passed 1, errors 1 \& 2: passed 2, errors 0 \& 3: passed 0, errors 1 \& 4: passed 1, errors 0 \& passed 4, errors 2 final .Ve .Sp Or multiple values via an \s-1ARRAY\s0 reference. .Sp .Vb 1 \& use MCE::Flow max_workers => 4; \& \& mce_flow { \& init_relay => [ 0, 0 ], \& }, \& sub { \& my $wid = MCE\->wid; \& \& ## do work \& my $pass = $wid % 3; \& my $errs = $wid % 2; \& \& ## relay \& my @last_rpt = MCE::relay { $_\->[0] += $pass; $_\->[1] += $errs }; \& \& MCE\->print("$wid: passed $pass, errors $errs\en"); \& \& return; \& }; \& \& my ($pass, $errs) = MCE\->relay_final; \& \& print " passed $pass, errors $errs final\en\en"; \& \& \-\- Output \& \& 1: passed 1, errors 1 \& 2: passed 2, errors 0 \& 3: passed 0, errors 1 \& 4: passed 1, errors 0 \& passed 4, errors 2 final .Ve .Sp Or simply a scalar value. .Sp .Vb 1 \& use MCE::Flow max_workers => 4; \& \& mce_flow { \& init_relay => 0, \& }, \& sub { \& my $wid = MCE\->wid; \& \& ## do work \& my $bytes_read = 1000 + ((MCE\->wid % 3) * 3); \& \& ## relay \& my $last_offset = MCE::relay { $_ += $bytes_read }; \& \& ## output \& MCE\->print("$wid: $bytes_read\en"); \& \& return; \& }; \& \& my $total = MCE\->relay_final; \& \& print " $total size\en\en"; \& \& \-\- Output \& \& 1: 1003 \& 2: 1006 \& 3: 1000 \& 4: 1003 \& 4012 size .Ve .IP "\s-1MCE\-\s0>relay_final ( void )" 3 .IX Item "MCE->relay_final ( void )" Call this method to obtain the final relay value(s) after running. See included example findnull.pl for another use case. .Sp .Vb 1 \& use MCE max_workers => 4; \& \& my $mce = MCE\->new( \& init_relay => [ 0, 100 ], ## initial values (two counters) \& \& user_func => sub { \& my ($mce) = @_; \& \& ## do work \& my ($acc1, $acc2) = (10, 20); \& \& ## relay to next worker \& MCE::relay { $_\->[0] += $acc1; $_\->[1] += $acc2 }; \& \& return; \& } \& )\->run; \& \& my ($cnt1, $cnt2) = $mce\->relay_final; \& \& print "$cnt1 : $cnt2\en"; \& \& \-\- Output \& \& 40 : 180 .Ve .IP "\s-1MCE\-\s0>relay_recv ( void )" 3 .IX Item "MCE->relay_recv ( void )" Call this method to obtain the next relay value before relaying. This allows serial-code to be processed orderly between workers. The following is a parallel demonstration for the fasta-benchmark on the web. .Sp .Vb 1 \& # perl fasta.pl 25000000 \& \& # The Computer Language Benchmarks game \& # http://benchmarksgame.alioth.debian.org/ \& # \& # contributed by Barry Walsh \& # port of fasta.rb #6 \& # \& # MCE::Flow version by Mario Roy \& # requires MCE 1.807+ \& # requires MCE::Shared 1.806+ \& \& use strict; \& use warnings; \& use feature \*(Aqsay\*(Aq; \& \& use MCE::Flow; \& use MCE::Shared; \& use MCE::Candy; \& \& use constant IM => 139968; \& use constant IA => 3877; \& use constant IC => 29573; \& \& my $LAST = MCE::Shared\->scalar( 42 ); \& \& my $alu = \& \*(AqGGCCGGGCGCGGTGGCTCACGCCTGTAATCCCAGCACTTTGG\*(Aq . \& \*(AqGAGGCCGAGGCGGGCGGATCACCTGAGGTCAGGAGTTCGAGA\*(Aq . \& \*(AqCCAGCCTGGCCAACATGGTGAAACCCCGTCTCTACTAAAAAT\*(Aq . \& \*(AqACAAAAATTAGCCGGGCGTGGTGGCGCGCGCCTGTAATCCCA\*(Aq . \& \*(AqGCTACTCGGGAGGCTGAGGCAGGAGAATCGCTTGAACCCGGG\*(Aq . \& \*(AqAGGCGGAGGTTGCAGTGAGCCGAGATCGCGCCACTGCACTCC\*(Aq . \& \*(AqAGCCTGGGCGACAGAGCGAGACTCCGTCTCAAAAA\*(Aq; \& \& my $iub = [ \& [ \*(Aqa\*(Aq, 0.27 ], [ \*(Aqc\*(Aq, 0.12 ], [ \*(Aqg\*(Aq, 0.12 ], \& [ \*(Aqt\*(Aq, 0.27 ], [ \*(AqB\*(Aq, 0.02 ], [ \*(AqD\*(Aq, 0.02 ], \& [ \*(AqH\*(Aq, 0.02 ], [ \*(AqK\*(Aq, 0.02 ], [ \*(AqM\*(Aq, 0.02 ], \& [ \*(AqN\*(Aq, 0.02 ], [ \*(AqR\*(Aq, 0.02 ], [ \*(AqS\*(Aq, 0.02 ], \& [ \*(AqV\*(Aq, 0.02 ], [ \*(AqW\*(Aq, 0.02 ], [ \*(AqY\*(Aq, 0.02 ] \& ]; \& \& my $homosapiens = [ \& [ \*(Aqa\*(Aq, 0.3029549426680 ], \& [ \*(Aqc\*(Aq, 0.1979883004921 ], \& [ \*(Aqg\*(Aq, 0.1975473066391 ], \& [ \*(Aqt\*(Aq, 0.3015094502008 ] \& ]; \& \& sub make_repeat_fasta { \& my ( $src, $n ) = @_; \& my $width = qr/(.{1,60})/; \& my $l = length $src; \& my $s = $src x ( ($n / $l) + 1 ); \& substr( $s, $n, $l ) = \*(Aq\*(Aq; \& \& while ( $s =~ m/$width/g ) { say $1 } \& } \& \& sub make_random_fasta { \& my ( $table, $n ) = @_; \& my $rand = undef; \& my $width = 60; \& my $prob = 0.0; \& my $output = \*(Aq\*(Aq; \& my ( $c1, $c2, $last ); \& \& $_\->[1] = ( $prob += $_\->[1] ) for @$table; \& \& $c1 = \*(Aq$rand = ( $last = ( $last * IA + IC ) % IM ) / IM;\*(Aq; \& $c1 .= "\e$output .= \*(Aq$_\->[0]\*(Aq, next if $_\->[1] > \e$rand;\en" for @$table; \& \& my $seq = MCE::Shared\->sequence( \& { chunk_size => 2000, bounds_only => 1 }, \& 1, $n / $width \& ); \& \& my $code1 = q{ \& while ( 1 ) { \& # \-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\- \& # Process code orderly between workers. \& # \-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\- \& \& my $chunk_id = MCE\->relay_recv; \& my ( $begin, $end ) = $seq\->next; \& \& MCE\->relay, last if ( !defined $begin ); \& \& my $last = $LAST\->get; \& my $temp = $last; \& \& # Pre\-compute $LAST value for the next worker \& for ( 1 .. ( $end \- $begin + 1 ) * $width ) { \& $temp = ( $temp * IA + IC ) % IM; \& } \& \& $LAST\->set( $temp ); \& \& # Increment chunk_id value \& MCE\->relay( sub { $_ += 1 } ); \& \& # \-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\- \& # Also run code in parallel between workers. \& # \-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\- \& \& for ( $begin .. $end ) { \& for ( 1 .. $width ) { !C! } \& $output .= "\en"; \& } \& \& # \-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\- \& # Display orderly. \& # \-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\- \& \& MCE\->gather( $chunk_id, $output ); \& \& $output = \*(Aq\*(Aq; \& } \& }; \& \& $code1 =~ s/!C!/$c1/g; \& \& MCE::Flow\->init( \& max_workers => 4, ## MCE::Util\->get_ncpu || 4, \& gather => MCE::Candy::out_iter_fh( \e*STDOUT ), \& init_relay => 1, \& use_threads => 0, \& ); \& \& MCE::Flow\->run( sub { eval $code1 } ); \& MCE::Flow\->finish; \& \& $last = $LAST\->get; \& \& $c2 = \*(Aq$rand = ( $last = ( $last * IA + IC ) % IM ) / IM;\*(Aq; \& $c2 .= "print(\*(Aq$_\->[0]\*(Aq), next if $_\->[1] > \e$rand;\en" for @$table; \& \& my $code2 = q{ \& if ( $n % $width != 0 ) { \& for ( 1 .. $n % $width ) { !C! } \& print "\en"; \& } \& }; \& \& $code2 =~ s/!C!/$c2/g; \& eval $code2; \& \& $LAST\->set( $last ); \& } \& \& my $n = $ARGV[0] || 27; \& \& say ">ONE Homo sapiens alu"; \& make_repeat_fasta( $alu, $n * 2 ); \& \& say ">TWO IUB ambiguity codes"; \& make_random_fasta( $iub, $n * 3 ); \& \& say ">THREE Homo sapiens frequency"; \& make_random_fasta( $homosapiens, $n * 5 ); .Ve .IP "\s-1MCE\-\s0>relay_lock ( void )" 3 .IX Item "MCE->relay_lock ( void )" .PD 0 .IP "\s-1MCE\-\s0>relay_unlock ( void )" 3 .IX Item "MCE->relay_unlock ( void )" .PD The \f(CW\*(C`relay_lock\*(C'\fR and \f(CW\*(C`relay_unlock\*(C'\fR methods, added to \s-1MCE 1.807,\s0 are aliases for \f(CW\*(C`relay_recv\*(C'\fR and \f(CW\*(C`relay\*(C'\fR respectively. They allow one to perform an exclusive action prior to actual relaying of data. .Sp Below, \f(CW\*(C`user_func\*(C'\fR is taken from the \f(CW\*(C`cat.pl\*(C'\fR \s-1MCE\s0 example. Relaying is driven by \f(CW\*(C`chunk_id\*(C'\fR or \f(CW\*(C`task_wid\*(C'\fR when not processing input, thus occurs orderly. .Sp .Vb 2 \& user_func => sub { \& my ($mce, $chunk_ref, $chunk_id) = @_; \& \& if ($n_flag) { \& ## Relays the total lines read. \& \& my $output = \*(Aq\*(Aq; my $line_count = ($$chunk_ref =~ tr/\en//); \& my $lines_read = MCE::relay { $_ += $line_count }; \& \& open my $fh, \*(Aq<\*(Aq, $chunk_ref; \& $output .= sprintf "%6d\et%s", ++$lines_read, $_ while (<$fh>); \& close $fh; \& \& $output .= ":$chunk_id"; \& MCE\->do(\*(Aqdisplay_chunk\*(Aq, $output); \& } \& else { \& ## The following is another way to have ordered output. Workers \& ## write directly to STDOUT exclusively without any involvement \& ## from the manager process. The statement(s) between relay_lock \& ## and relay_unlock run serially and most important orderly. \& \& MCE\->relay_lock; # alias for MCE\->relay_recv \& \& print $$chunk_ref; # ensure $| = 1 in script \& \& MCE\->relay_unlock; # alias for MCE\->relay \& } \& \& return; \& } .Ve .Sp The following is a variant of the fasta-benchmark demonstration shown above. Here, workers write exclusively and orderly to \f(CW\*(C`STDOUT\*(C'\fR. .Sp .Vb 1 \& # perl fasta.pl 25000000 \& \& # The Computer Language Benchmarks game \& # http://benchmarksgame.alioth.debian.org/ \& # \& # contributed by Barry Walsh \& # port of fasta.rb #6 \& # \& # MCE::Flow version by Mario Roy \& # requires MCE 1.807+ \& # requires MCE::Shared 1.806+ \& \& use strict; \& use warnings; \& use feature \*(Aqsay\*(Aq; \& \& use MCE::Flow; \& use MCE::Shared; \& \& use constant IM => 139968; \& use constant IA => 3877; \& use constant IC => 29573; \& \& my $LAST = MCE::Shared\->scalar( 42 ); \& \& my $alu = \& \*(AqGGCCGGGCGCGGTGGCTCACGCCTGTAATCCCAGCACTTTGG\*(Aq . \& \*(AqGAGGCCGAGGCGGGCGGATCACCTGAGGTCAGGAGTTCGAGA\*(Aq . \& \*(AqCCAGCCTGGCCAACATGGTGAAACCCCGTCTCTACTAAAAAT\*(Aq . \& \*(AqACAAAAATTAGCCGGGCGTGGTGGCGCGCGCCTGTAATCCCA\*(Aq . \& \*(AqGCTACTCGGGAGGCTGAGGCAGGAGAATCGCTTGAACCCGGG\*(Aq . \& \*(AqAGGCGGAGGTTGCAGTGAGCCGAGATCGCGCCACTGCACTCC\*(Aq . \& \*(AqAGCCTGGGCGACAGAGCGAGACTCCGTCTCAAAAA\*(Aq; \& \& my $iub = [ \& [ \*(Aqa\*(Aq, 0.27 ], [ \*(Aqc\*(Aq, 0.12 ], [ \*(Aqg\*(Aq, 0.12 ], \& [ \*(Aqt\*(Aq, 0.27 ], [ \*(AqB\*(Aq, 0.02 ], [ \*(AqD\*(Aq, 0.02 ], \& [ \*(AqH\*(Aq, 0.02 ], [ \*(AqK\*(Aq, 0.02 ], [ \*(AqM\*(Aq, 0.02 ], \& [ \*(AqN\*(Aq, 0.02 ], [ \*(AqR\*(Aq, 0.02 ], [ \*(AqS\*(Aq, 0.02 ], \& [ \*(AqV\*(Aq, 0.02 ], [ \*(AqW\*(Aq, 0.02 ], [ \*(AqY\*(Aq, 0.02 ] \& ]; \& \& my $homosapiens = [ \& [ \*(Aqa\*(Aq, 0.3029549426680 ], \& [ \*(Aqc\*(Aq, 0.1979883004921 ], \& [ \*(Aqg\*(Aq, 0.1975473066391 ], \& [ \*(Aqt\*(Aq, 0.3015094502008 ] \& ]; \& \& sub make_repeat_fasta { \& my ( $src, $n ) = @_; \& my $width = qr/(.{1,60})/; \& my $l = length $src; \& my $s = $src x ( ($n / $l) + 1 ); \& substr( $s, $n, $l ) = \*(Aq\*(Aq; \& \& while ( $s =~ m/$width/g ) { say $1 } \& } \& \& sub make_random_fasta { \& my ( $table, $n ) = @_; \& my $rand = undef; \& my $width = 60; \& my $prob = 0.0; \& my $output = \*(Aq\*(Aq; \& my ( $c1, $c2, $last ); \& \& $_\->[1] = ( $prob += $_\->[1] ) for @$table; \& \& $c1 = \*(Aq$rand = ( $last = ( $last * IA + IC ) % IM ) / IM;\*(Aq; \& $c1 .= "\e$output .= \*(Aq$_\->[0]\*(Aq, next if $_\->[1] > \e$rand;\en" for @$table; \& \& my $seq = MCE::Shared\->sequence( \& { chunk_size => 2000, bounds_only => 1 }, \& 1, $n / $width \& ); \& \& my $code1 = q{ \& $| = 1; # Important, must flush output immediately. \& \& while ( 1 ) { \& # \-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\- \& # Process code orderly between workers. \& # \-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\- \& \& MCE\->relay_lock; \& \& my ( $begin, $end ) = $seq\->next; \& print( $output ), $output = \*(Aq\*(Aq if ( length $output ); \& \& MCE\->relay_unlock, last if ( !defined $begin ); \& \& my $last = $LAST\->get; \& my $temp = $last; \& \& # Pre\-compute $LAST value for the next worker \& for ( 1 .. ( $end \- $begin + 1 ) * $width ) { \& $temp = ( $temp * IA + IC ) % IM; \& } \& \& $LAST\->set( $temp ); \& \& MCE\->relay_unlock; \& \& # \-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\- \& # Also run code in parallel. \& # \-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\-\- \& \& for ( $begin .. $end ) { \& for ( 1 .. $width ) { !C! } \& $output .= "\en"; \& } \& } \& }; \& \& $code1 =~ s/!C!/$c1/g; \& \& MCE::Flow\->init( \& max_workers => 4, ## MCE::Util\->get_ncpu || 4, \& init_relay => 0, \& use_threads => 0, \& ); \& \& MCE::Flow\->run( sub { eval $code1 } ); \& MCE::Flow\->finish; \& \& $last = $LAST\->get; \& \& $c2 = \*(Aq$rand = ( $last = ( $last * IA + IC ) % IM ) / IM;\*(Aq; \& $c2 .= "print(\*(Aq$_\->[0]\*(Aq), next if $_\->[1] > \e$rand;\en" for @$table; \& \& my $code2 = q{ \& if ( $n % $width != 0 ) { \& for ( 1 .. $n % $width ) { !C! } \& print "\en"; \& } \& }; \& \& $code2 =~ s/!C!/$c2/g; \& eval $code2; \& \& $LAST\->set( $last ); \& } \& \& my $n = $ARGV[0] || 27; \& \& say ">ONE Homo sapiens alu"; \& make_repeat_fasta( $alu, $n * 2 ); \& \& say ">TWO IUB ambiguity codes"; \& make_random_fasta( $iub, $n * 3 ); \& \& say ">THREE Homo sapiens frequency"; \& make_random_fasta( $homosapiens, $n * 5 ); .Ve .SH "GATHER AND RELAY DEMONSTRATIONS" .IX Header "GATHER AND RELAY DEMONSTRATIONS" I received a request from John Martel to process a large flat file and expand each record to many records based on splitting out items in field 4 delimited by semicolons. Each row in the output is given a unique \s-1ID\s0 starting with one while preserving output order. .IP "Input File, possibly larger than 500 GiB in size" 3 .IX Item "Input File, possibly larger than 500 GiB in size" .Vb 4 \& foo|field2|field3|item1;item2;item3;item4;itemN|field5|field6|field7 \& bar|field2|field3|item1;item2;item3;item4;itemN|field5|field6|field7 \& baz|field2|field3|item1;item2;item3;item4;itemN|field5|field6|field7 \& ... .Ve .IP "Output File" 3 .IX Item "Output File" .Vb 10 \& 000000000000001|item1|foo|field2|field3|field5|field6|field7 \& 000000000000002|item2|foo|field2|field3|field5|field6|field7 \& 000000000000003|item3|foo|field2|field3|field5|field6|field7 \& 000000000000004|item4|foo|field2|field3|field5|field6|field7 \& 000000000000005|itemN|foo|field2|field3|field5|field6|field7 \& 000000000000006|item1|bar|field2|field3|field5|field6|field7 \& 000000000000007|item2|bar|field2|field3|field5|field6|field7 \& 000000000000008|item3|bar|field2|field3|field5|field6|field7 \& 000000000000009|item4|bar|field2|field3|field5|field6|field7 \& 000000000000010|itemN|bar|field2|field3|field5|field6|field7 \& 000000000000011|item1|baz|field2|field3|field5|field6|field7 \& 000000000000012|item2|baz|field2|field3|field5|field6|field7 \& 000000000000013|item3|baz|field2|field3|field5|field6|field7 \& 000000000000014|item4|baz|field2|field3|field5|field6|field7 \& 000000000000015|itemN|baz|field2|field3|field5|field6|field7 \& ... .Ve .IP "Example One" 3 .IX Item "Example One" This example configures a custom function for preserving output order. Unfortunately, the sprintf function alone involves extra \s-1CPU\s0 time causing the manager process to fall behind. Thus, workers may idle while waiting for the manager process to respond to the gather request. .Sp .Vb 2 \& use strict; \& use warnings; \& \& use MCE::Loop; \& \& my $infile = shift or die "Usage: $0 infile\en"; \& my $newfile = \*(Aqoutput.dat\*(Aq; \& \& open my $fh_out, \*(Aq>\*(Aq, $newfile or die "open error $newfile: $!\en"; \& \& sub preserve_order { \& my ($fh) = @_; \& my ($order_id, $start_idx, $idx, %tmp) = (1, 1); \& \& return sub { \& my ($chunk_id, $aref) = @_; \& $tmp{ $chunk_id } = $aref; \& \& while ( my $aref = delete $tmp{ $order_id } ) { \& foreach my $line ( @{ $aref } ) { \& $idx = sprintf "%015d", $start_idx++; \& print $fh $idx, $line; \& } \& $order_id++; \& } \& } \& } \& \& MCE::Loop::init { \& chunk_size => \*(Aqauto\*(Aq, max_workers => 3, \& gather => preserve_order($fh_out) \& }; \& \& mce_loop_f { \& my ($mce, $chunk_ref, $chunk_id) = @_; \& my @buf; \& \& foreach my $line (@{ $chunk_ref }) { \& $line =~ s/\er//g; chomp $line; \& \& my ($f1,$f2,$f3,$items,$f5,$f6,$f7) = split /\e|/, $line; \& my @items_array = split /;/, $items; \& \& foreach my $item (@items_array) { \& push @buf, "|$item|$f1|$f2|$f3|$f5|$f6|$f7\en"; \& } \& } \& \& MCE\->gather($chunk_id, \e@buf); \& \& } $infile; \& \& MCE::Loop::finish(); \& close $fh_out; .Ve .IP "Example Two" 3 .IX Item "Example Two" In this example, workers obtain the current \s-1ID\s0 value and increment/relay for the next worker, ordered by chunk \s-1ID\s0 behind the scene. Workers call sprintf in parallel, allowing the manager process (out_iter_fh) to accommodate up to 32 workers and not fall behind. .Sp Relay accounts for the worker handling the next chunk_id value. Therefore, do not call relay more than once per chunk. Doing so will cause \s-1IPC\s0 to stall. .Sp .Vb 2 \& use strict; \& use warnings; \& \& use MCE::Loop; \& use MCE::Candy; \& \& my $infile = shift or die "Usage: $0 infile\en"; \& my $newfile = \*(Aqoutput.dat\*(Aq; \& \& open my $fh_out, \*(Aq>\*(Aq, $newfile or die "open error $newfile: $!\en"; \& \& MCE::Loop::init { \& chunk_size => \*(Aqauto\*(Aq, max_workers => 8, \& gather => MCE::Candy::out_iter_fh($fh_out), \& init_relay => 1 \& }; \& \& mce_loop_f { \& my ($mce, $chunk_ref, $chunk_id) = @_; \& my @lines; \& \& foreach my $line (@{ $chunk_ref }) { \& $line =~ s/\er//g; chomp $line; \& \& my ($f1,$f2,$f3,$items,$f5,$f6,$f7) = split /\e|/, $line; \& my @items_array = split /;/, $items; \& \& foreach my $item (@items_array) { \& push @lines, "$item|$f1|$f2|$f3|$f5|$f6|$f7\en"; \& } \& } \& \& my $idx = MCE::relay { $_ += scalar @lines }; \& my $buf = \*(Aq\*(Aq; \& \& foreach my $line ( @lines ) { \& $buf .= sprintf "%015d|%s", $idx++, $line \& } \& \& MCE\->gather($chunk_id, $buf); \& \& } $infile; \& \& MCE::Loop::finish(); \& close $fh_out; .Ve .SH "INDEX" .IX Header "INDEX" \&\s-1MCE\s0, MCE::Core .SH "AUTHOR" .IX Header "AUTHOR" Mario E. Roy,