D7net
Home
Console
Upload
information
Create File
Create Folder
About
Tools
:
/
proc
/
self
/
root
/
usr
/
local
/
share
/
perl5
/
MCE
/
Core
/
Filename :
Worker.pm
back
Copy
############################################################################### ## ---------------------------------------------------------------------------- ## Core methods for the worker process. ## ## This package provides main, loop, and relevant methods used internally by ## the worker process. ## ## There is no public API. ## ############################################################################### package MCE::Core::Worker; use strict; use warnings; our $VERSION = '1.835'; my $_has_threads = $INC{'threads.pm'} ? 1 : 0; my $_tid = $_has_threads ? threads->tid() : 0; sub CLONE { $_tid = threads->tid() if $_has_threads; } ## Items below are folded into MCE. package # hide from rpm MCE; no warnings qw( threads recursion uninitialized ); use bytes; ############################################################################### ## ---------------------------------------------------------------------------- ## Internal do, gather and send related functions for serializing data to ## destination. User functions for handling gather, queue or void. ## ############################################################################### { my ( $_dest, $_len, $_tag, $_task_id, $_user_func, $_val, $_wa, $_DAT_LOCK, $_DAT_W_SOCK, $_DAU_W_SOCK, $_chn, $_lock_chn, $_dat_ex, $_dat_un ); ## Create array structure containing various send functions. my @_dest_function = (); $_dest_function[SENDTO_FILEV2] = sub { ## Content >> File return unless (defined $_val); local $\ = undef if (defined $\); if (length ${ $_[0] }) { $_dat_ex->() if $_lock_chn; print({$_DAT_W_SOCK} OUTPUT_F_SND.$LF . $_chn.$LF), print({$_DAU_W_SOCK} $_val.$LF . length(${ $_[0] }).$LF, ${ $_[0] }); $_dat_un->() if $_lock_chn; } return; }; $_dest_function[SENDTO_FD] = sub { ## Content >> File descriptor return unless (defined $_val); local $\ = undef if (defined $\); if (length ${ $_[0] }) { $_dat_ex->() if $_lock_chn; print({$_DAT_W_SOCK} OUTPUT_D_SND.$LF . $_chn.$LF), print({$_DAU_W_SOCK} $_val.$LF . length(${ $_[0] }).$LF, ${ $_[0] }); $_dat_un->() if $_lock_chn; } return; }; $_dest_function[SENDTO_STDOUT] = sub { ## Content >> STDOUT local $\ = undef if (defined $\); if (length ${ $_[0] }) { $_dat_ex->() if $_lock_chn; print({$_DAT_W_SOCK} OUTPUT_O_SND.$LF . $_chn.$LF), print({$_DAU_W_SOCK} length(${ $_[0] }).$LF, ${ $_[0] }); $_dat_un->() if $_lock_chn; } return; }; $_dest_function[SENDTO_STDERR] = sub { ## Content >> STDERR local $\ = undef if (defined $\); if (length ${ $_[0] }) { $_dat_ex->() if $_lock_chn; print({$_DAT_W_SOCK} OUTPUT_E_SND.$LF . $_chn.$LF), print({$_DAU_W_SOCK} length(${ $_[0] }).$LF, ${ $_[0] }); $_dat_un->() if $_lock_chn; } return; }; ## ------------------------------------------------------------------------- sub _do_callback { my ($self, $_buf, $_aref); ($self, $_val, $_aref) = @_; unless (defined wantarray) { $_wa = WANTS_UNDEF; } elsif (wantarray) { $_wa = WANTS_ARRAY; } else { $_wa = WANTS_SCALAR; } local $\ = undef if (defined $\); ## Crossover: Send arguments if ( ! @{ $_aref } ) { $_dat_ex->() if $_lock_chn; print({$_DAT_W_SOCK} OUTPUT_N_CBK.$LF . $_chn.$LF), print({$_DAU_W_SOCK} $_wa.$LF . $_val.$LF); } elsif ( @{ $_aref } == 1 && !ref $_aref->[0] && defined $_aref->[0] && !looks_like_number($_aref->[0]) ) { $_len = length $_aref->[0]; $_dat_ex->() if $_lock_chn; print({$_DAT_W_SOCK} OUTPUT_S_CBK.$LF . $_chn.$LF), print({$_DAU_W_SOCK} $_wa.$LF . $_val.$LF . $_len.$LF, $_aref->[0]); } else { $_buf = $self->{freeze}($_aref); $_len = length $_buf; $_dat_ex->() if $_lock_chn; print({$_DAT_W_SOCK} OUTPUT_A_CBK.$LF . $_chn.$LF), print({$_DAU_W_SOCK} $_wa.$LF . $_val.$LF . $_len.$LF, $_buf); } ## Crossover: Receive value if ( $_wa ) { local $/ = $LF if ($/ ne $LF); chomp(my $_len = <$_DAU_W_SOCK>); my $_frozen = chop($_len); read $_DAU_W_SOCK, my($_buf), $_len; $_dat_un->() if $_lock_chn; return ( $_wa != WANTS_ARRAY ) ? $_frozen ? ($self->{thaw}($_buf))->[0] : $_buf : @{ $self->{thaw}($_buf) }; } $_dat_un->() if $_lock_chn; } ## ------------------------------------------------------------------------- sub _do_gather { my $_buf; my ($self, $_aref) = @_; return unless (scalar @{ $_aref }); if (scalar @{ $_aref } > 1 || ref $_aref->[0] || !defined $_aref->[0] || looks_like_number $_aref->[0]) { $_tag = OUTPUT_A_GTR; $_buf = $self->{freeze}($_aref); $_len = length $_buf; } else { $_tag = OUTPUT_S_GTR; if (defined $_aref->[0]) { $_len = length $_aref->[0]; local $\ = undef if (defined $\); $_dat_ex->() if $_lock_chn; print({$_DAT_W_SOCK} $_tag.$LF . $_chn.$LF), print({$_DAU_W_SOCK} $_task_id.$LF . $_len.$LF, $_aref->[0]); $_dat_un->() if $_lock_chn; return; } else { $_buf = ''; $_len = -1; } } local $\ = undef if (defined $\); $_dat_ex->() if $_lock_chn; print({$_DAT_W_SOCK} $_tag.$LF . $_chn.$LF), print({$_DAU_W_SOCK} $_task_id.$LF . $_len.$LF, $_buf); $_dat_un->() if $_lock_chn; return; } ## ------------------------------------------------------------------------- sub _do_send { my $_data_ref; my $self = shift; $_dest = shift; $_val = shift; if (scalar @_ > 1) { $_data_ref = \join('', @_); } elsif (my $_ref = ref $_[0]) { if ($_ref eq 'SCALAR') { $_data_ref = $_[0]; } elsif ($_ref eq 'ARRAY') { $_data_ref = \join('', @{ $_[0] }); } elsif ($_ref eq 'HASH') { $_data_ref = \join('', %{ $_[0] }); } else { $_data_ref = \join('', @_); } } else { $_data_ref = \$_[0]; } $_dest_function[$_dest]($_data_ref); return; } sub _do_send_glob { my ($self, $_glob, $_fd, $_data_ref) = @_; if ($self->{_wid} > 0) { if ($_fd == 1) { _do_send($self, SENDTO_STDOUT, undef, $_data_ref); } elsif ($_fd == 2) { _do_send($self, SENDTO_STDERR, undef, $_data_ref); } else { _do_send($self, SENDTO_FD, $_fd, $_data_ref); } } else { require Symbol unless $INC{'Symbol.pm'}; my $_fh = Symbol::qualify_to_ref($_glob, caller); local $\ = undef if (defined $\); print {$_fh} ${ $_data_ref }; } return; } ## ------------------------------------------------------------------------- sub _do_send_init { my ($self) = @_; $_chn = $self->{_chn}; $_DAT_LOCK = $self->{_dat_lock}; $_DAT_W_SOCK = $self->{_dat_w_sock}->[0]; $_DAU_W_SOCK = $self->{_dat_w_sock}->[$_chn]; $_lock_chn = $self->{_lock_chn}; $_task_id = $self->{_task_id}; if ($_lock_chn) { # inlined for performance $_dat_ex = sub { my $_pid = $_has_threads ? $$ .'.'. $_tid : $$; sysread($_DAT_LOCK->{_r_sock}, my($b), 1), $_DAT_LOCK->{ $_pid } = 1 unless $_DAT_LOCK->{ $_pid }; }; $_dat_un = sub { my $_pid = $_has_threads ? $$ .'.'. $_tid : $$; syswrite($_DAT_LOCK->{_w_sock}, '0'), $_DAT_LOCK->{ $_pid } = 0 if $_DAT_LOCK->{ $_pid }; }; } { local $!; # IO::Handle->autoflush not available in older Perl. select(( select(*STDERR), $| = 1 )[0]) if defined(fileno *STDERR); select(( select(*STDOUT), $| = 1 )[0]) if defined(fileno *STDOUT); } return; } sub _do_send_clear { my ($self) = @_; $_dest = $_len = $_task_id = $_user_func = $_val = $_wa = undef; $_DAT_LOCK = $_DAT_W_SOCK = $_DAU_W_SOCK = $_chn = $_lock_chn = undef; $_dat_ex = $_dat_un = $_tag = undef; return; } ## ------------------------------------------------------------------------- sub _do_user_func { my ($self, $_chunk, $_chunk_id) = @_; my $_size = 0; if ($self->{progress} && $self->{_task_id} == 0) { # use_slurpio if (ref $_chunk eq 'SCALAR') { $_size += length ${ $_chunk }; } # sequence and bounds_only elsif ($self->{sequence} && $self->{bounds_only}) { my $_seq = $self->{sequence}; my $_step = (ref $_seq eq 'ARRAY') ? $_seq->[2] : $_seq->{step}; $_size += int(abs($_chunk->[0] - $_chunk->[1]) / abs($_step)) + 1; } # workers clear {input_data} to conserve memory when array ref # otherwise, /path/to/infile or scalar reference elsif ($self->{input_data}) { map { $_size += length } @{ $_chunk }; } # array or sequence else { $_size += (ref $_chunk eq 'ARRAY') ? @{ $_chunk } : 1; } } $self->{_retry} = [ $_chunk, $_chunk_id, $self->{max_retries} ] if ($self->{max_retries}); $self->{_chunk_id} = $_chunk_id; $_user_func->($self, $_chunk, $_chunk_id); if ($self->{progress} && $self->{_task_id} == 0) { local $\ = undef if (defined $\); $_dat_ex->() if $_lock_chn; print({$_DAT_W_SOCK} OUTPUT_P_NFY.$LF . $_chn.$LF), print({$_DAU_W_SOCK} $_size.$LF); $_dat_un->() if $_lock_chn; } return; } sub _do_user_func_init { my ($self) = @_; $_user_func = $self->{user_func}; return; } } ############################################################################### ## ---------------------------------------------------------------------------- ## Worker process -- Do. ## ############################################################################### sub _worker_do { my ($self, $_params_ref) = @_; @_ = (); ## Set options. $self->{_abort_msg} = $_params_ref->{_abort_msg}; $self->{_run_mode} = $_params_ref->{_run_mode}; $self->{_single_dim} = $_params_ref->{_single_dim}; $self->{use_slurpio} = $_params_ref->{_use_slurpio}; $self->{parallel_io} = $_params_ref->{_parallel_io}; $self->{progress} = $_params_ref->{_progress}; $self->{max_retries} = $_params_ref->{_max_retries}; $self->{RS} = $_params_ref->{_RS}; _do_user_func_init($self); ## Init local vars. my $_chn = $self->{_chn}; my $_DAT_LOCK = $self->{_dat_lock}; my $_DAT_W_SOCK = $self->{_dat_w_sock}->[0]; my $_DAU_W_SOCK = $self->{_dat_w_sock}->[$_chn]; my $_lock_chn = $self->{_lock_chn}; my $_run_mode = $self->{_run_mode}; my $_task_id = $self->{_task_id}; my $_task_name = $self->{task_name}; ## Do not override params if defined in user_tasks during instantiation. for my $_p (qw(bounds_only chunk_size sequence user_args)) { if (defined $_params_ref->{"_${_p}"}) { $self->{$_p} = $_params_ref->{"_${_p}"} unless (defined $self->{_task}->{$_p}); } } ## Assign user function. $self->{_wuf} = \&_do_user_func; ## Call user_begin if defined. if (defined $self->{user_begin}) { $self->{_chunk_id} = 0; $self->{user_begin}($self, $_task_id, $_task_name); $self->sync() if ($_task_id == 0 && defined $self->{init_relay}); } ## Retry chunk if previous attempt died. if ($self->{_retry}) { $self->{_chunk_id} = $self->{_retry}->[1]; $self->{user_func}->($self, $self->{_retry}->[0], $self->{_retry}->[1]); delete $self->{_retry}; } ## Call worker function. if ($_run_mode eq 'sequence') { require MCE::Core::Input::Sequence unless $INC{'MCE/Core/Input/Sequence.pm'}; _worker_sequence_queue($self); } elsif (defined $self->{_task}->{sequence}) { require MCE::Core::Input::Generator unless $INC{'MCE/Core/Input/Generator.pm'}; _worker_sequence_generator($self); } elsif ($_run_mode eq 'array') { require MCE::Core::Input::Request unless $INC{'MCE/Core/Input/Request.pm'}; _worker_request_chunk($self, REQUEST_ARRAY); } elsif ($_run_mode eq 'glob') { require MCE::Core::Input::Request unless $INC{'MCE/Core/Input/Request.pm'}; _worker_request_chunk($self, REQUEST_GLOB); } elsif ($_run_mode eq 'hash') { require MCE::Core::Input::Request unless $INC{'MCE/Core/Input/Request.pm'}; _worker_request_chunk($self, REQUEST_HASH); } elsif ($_run_mode eq 'iterator') { require MCE::Core::Input::Iterator unless $INC{'MCE/Core/Input/Iterator.pm'}; _worker_user_iterator($self); } elsif ($_run_mode eq 'file') { require MCE::Core::Input::Handle unless $INC{'MCE/Core/Input/Handle.pm'}; _worker_read_handle($self, READ_FILE, $_params_ref->{_input_file}); } elsif ($_run_mode eq 'memory') { require MCE::Core::Input::Handle unless $INC{'MCE/Core/Input/Handle.pm'}; _worker_read_handle($self, READ_MEMORY, $self->{input_data}); } elsif (defined $self->{user_func}) { $self->{_chunk_id} = 0; $self->{user_func}->($self); } undef $self->{_next_jmp} if (defined $self->{_next_jmp}); undef $self->{_last_jmp} if (defined $self->{_last_jmp}); undef $self->{user_data} if (defined $self->{user_data}); ## Call user_end if defined. if (defined $self->{user_end}) { $self->{_chunk_id} = 0; $self->sync() if ($_task_id == 0 && defined $self->{init_relay}); $self->{user_end}($self, $_task_id, $_task_name); } delete $self->{_wuf}; ## Check nested Hobo workers not yet joined. MCE::Hobo->finish('MCE') if $INC{'MCE/Hobo.pm'}; ## Notify the main process a worker has completed. local $\ = undef if (defined $\); $_DAT_LOCK->lock() if $_lock_chn; print({$_DAT_W_SOCK} OUTPUT_W_DNE.$LF . $_chn.$LF), print({$_DAU_W_SOCK} $_task_id.$LF); $_DAT_LOCK->unlock() if $_lock_chn; if ($^O eq 'MSWin32') { lock $self->{_run_lock}; } return; } ############################################################################### ## ---------------------------------------------------------------------------- ## Worker process -- Loop. ## ############################################################################### sub _worker_loop { my ($self) = @_; @_ = (); my ($_response, $_len, $_buf, $_params_ref); my $_COM_LOCK = $self->{_com_lock}; my $_COM_W_SOCK = $self->{_com_w_sock}; my $_job_delay = $self->{job_delay}; my $_wid = $self->{_wid}; if ($^O eq 'MSWin32') { lock $MCE::_WIN_LOCK; } while (1) { { local $\ = undef; local $/ = $LF; $_COM_LOCK->lock(); ## Wait for the next job request. $_response = <$_COM_W_SOCK>; print {$_COM_W_SOCK} $_wid.$LF; ## Return if instructed to exit. if ($_response eq "_exit\n") { $_COM_LOCK->unlock(); return; } ## Process send request. if ($_response eq "_data\n") { chomp($_len = <$_COM_W_SOCK>), read($_COM_W_SOCK, $_buf, $_len); print {$_COM_W_SOCK} $LF; $_COM_LOCK->unlock(); $self->{user_data} = $self->{thaw}($_buf); undef $_buf; if (defined $_job_delay && $_job_delay > 0.0) { sleep $_job_delay * $_wid; } } ## Process normal request. elsif ($_response =~ /\d+/) { chomp($_len = <$_COM_W_SOCK>), read($_COM_W_SOCK, $_buf, $_len); print {$_COM_W_SOCK} $LF; $_COM_LOCK->unlock(); $_params_ref = $self->{thaw}($_buf); undef $_buf; } ## Leave loop if invalid response. else { last; } } ## Send request. _worker_do($self, {}), next if ($_response eq "_data\n"); ## Wait here until MCE completes job submission to all workers. 1 until sysread($self->{_bse_r_sock}, my($_b), 1) || ($! && !$!{'EINTR'}); ## Normal request. if (defined $_job_delay && $_job_delay > 0.0) { sleep $_job_delay * $_wid; } _worker_do($self, $_params_ref); undef $_params_ref; } ## Notify the main process a worker has ended. The following is executed ## when an invalid reply was received above (not likely to occur). $_COM_LOCK->unlock(); die "Worker ($self->{_wid}) has ended prematurely"; } ############################################################################### ## ---------------------------------------------------------------------------- ## Worker process -- Main. ## ############################################################################### sub _worker_main { my ( $self, $_wid, $_task, $_task_id, $_task_wid, $_params, $_plugin_worker_init ) = @_; @_ = (); if (exists $self->{input_data}) { my $_ref = ref $self->{input_data}; delete $self->{input_data} if ($_ref && $_ref ne 'SCALAR'); } $self->{_task_id} = (defined $_task_id ) ? $_task_id : 0; $self->{_task_wid} = (defined $_task_wid) ? $_task_wid : $_wid; $self->{_task} = $_task; $self->{_wid} = $_wid; ## Define exit pid and DIE handler. my $_use_threads = (defined $_task->{use_threads}) ? $_task->{use_threads} : $self->{use_threads}; if ($_has_threads && $_use_threads) { $self->{_exit_pid} = 'TID_' . $_tid; } else { $self->{_exit_pid} = 'PID_' . $$; } my $_running_inside_eval = $^S; local $SIG{__DIE__} = sub { if (!defined $^S || $^S) { if ( ($_has_threads && $_tid != 0) || $ENV{'PERL_IPERL_RUNNING'} || $_running_inside_eval ) { # thread env or running inside IPerl, check stack trace my $_t = Carp::longmess(); $_t =~ s/\teval [^\n]+\n$//; if ( $_t =~ /^(?:[^\n]+\n){1,7}\teval / || $_t =~ /\n\teval [^\n]+\n\t(?:eval|Try)/ ) { CORE::die(@_); } } else { # normal env, trust $^S CORE::die(@_); } } local $SIG{__DIE__}; local $\ = undef; my $_die_msg = (defined $_[0]) ? $_[0] : ''; print {*STDERR} $_die_msg; $self->exit(255, $_die_msg, $self->{_chunk_id}); }; ## Use options from user_tasks if defined. $self->{max_workers} = $_task->{max_workers} if ($_task->{max_workers}); $self->{chunk_size} = $_task->{chunk_size} if ($_task->{chunk_size}); $self->{gather} = $_task->{gather} if ($_task->{gather}); $self->{sequence} = $_task->{sequence} if ($_task->{sequence}); $self->{bounds_only} = $_task->{bounds_only} if ($_task->{bounds_only}); $self->{task_name} = $_task->{task_name} if ($_task->{task_name}); $self->{user_args} = $_task->{user_args} if ($_task->{user_args}); $self->{user_begin} = $_task->{user_begin} if ($_task->{user_begin}); $self->{user_func} = $_task->{user_func} if ($_task->{user_func}); $self->{user_end} = $_task->{user_end} if ($_task->{user_end}); ## Init runtime vars. Obtain handle to lock files. my $_chn; if (defined $_params && exists $_params->{_chn}) { $_chn = $self->{_chn} = delete $_params->{_chn}; } else { $_chn = $self->{_chn} = $_wid % $self->{_data_channels} + 1; } ## Choose locks for DATA channels. $self->{_com_lock} = $self->{'_mutex_0'}; $self->{_dat_lock} = $self->{'_mutex_'.$_chn} if ($self->{_lock_chn}); ## Delete attributes no longer required after being spawned. delete @{ $self }{ qw( flush_file flush_stderr flush_stdout stderr_file stdout_file on_post_exit on_post_run user_data user_error user_output _pids _state _status _thrs _tids ) }; ## Call MCE::Shared's init routine if present; enables parallel IPC. ## For threads, init is called automatically via the CLONE feature. MCE::Shared::init($_wid) if (!$_use_threads && $INC{'MCE/Shared.pm'}); _do_send_init($self); ## Call module's worker_init routine for modules plugged into MCE. for my $_p (@{ $_plugin_worker_init }) { $_p->($self); } ## Begin processing if worker was added during processing. Otherwise, ## respond back to the main process if the last worker spawned. if (defined $_params) { _worker_do($self, $_params); undef $_params; } ## Enter worker loop. _worker_loop($self); ## Clear worker session. _do_send_clear($self); $self->{_com_lock} = undef; $self->{_dat_lock} = undef; return; } 1;