From f711a25b6d1f2af030be479ce163b1bdc03f8f4a Mon Sep 17 00:00:00 2001 From: Eric Johnson Date: Wed, 2 Dec 2015 21:08:49 -0500 Subject: [PATCH] Refactor Rex::TaskList. No functional change. --- lib/Rex/Fork/Manager.pm | 9 +-- lib/Rex/Fork/Task.pm | 21 ++----- lib/Rex/TaskList/Base.pm | 70 ++++++++++++++++-------- lib/Rex/TaskList/Parallel_ForkManager.pm | 65 ++-------------------- lib/Rex/Transaction.pm | 2 + t/summary.t | 15 ++--- 6 files changed, 64 insertions(+), 118 deletions(-) diff --git a/lib/Rex/Fork/Manager.pm b/lib/Rex/Fork/Manager.pm index b382074f6..838b3a399 100644 --- a/lib/Rex/Fork/Manager.pm +++ b/lib/Rex/Fork/Manager.pm @@ -27,12 +27,9 @@ sub new { } sub add { - my ( $self, $coderef, $task, $server ) = @_; - my $f = Rex::Fork::Task->new( - coderef => $coderef, - task => $task, - server => $server, - ); + my ( $self, $coderef ) = @_; + + my $f = Rex::Fork::Task->new( coderef => $coderef ); push( @{ $self->{'forks'} }, $f ); diff --git a/lib/Rex/Fork/Task.pm b/lib/Rex/Fork/Task.pm index 5a79fe72a..9344573cc 100644 --- a/lib/Rex/Fork/Task.pm +++ b/lib/Rex/Fork/Task.pm @@ -12,11 +12,6 @@ use POSIX ":sys_wait_h"; # VERSION -BEGIN { - use Rex::Shared::Var; - share qw(@SUMMARY); -} - sub new { my $that = shift; my $proto = ref($that) || $that; @@ -31,20 +26,12 @@ sub new { sub start { my ($self) = @_; + $self->{'running'} = 1; - if ( $self->{pid} = fork ) { return $self->{pid}; } - else { - $self->{chld} = 1; - - eval { $self->{coderef}->($self) }; - my $exit_code = $@ ? ( ( $? >> 8 ) || 1 ) : 0; - push @SUMMARY, - { - task => $self->{task}->name, - server => $self->{server}, - exit_code => $exit_code, - }; + $self->{pid} = fork; + if ( !$self->{pid} ) { + $self->{coderef}->($self); $self->{'running'} = 0; exit(); } diff --git a/lib/Rex/TaskList/Base.pm b/lib/Rex/TaskList/Base.pm index d3e562942..0d4f7edbe 100644 --- a/lib/Rex/TaskList/Base.pm +++ b/lib/Rex/TaskList/Base.pm @@ -11,6 +11,11 @@ use warnings; # VERSION +BEGIN { + use Rex::Shared::Var; + share qw(@SUMMARY); +} + use Data::Dumper; use Rex::Logger; use Rex::Task; @@ -290,39 +295,21 @@ sub run { my ( $self, $task, %options ) = @_; my $fm = Rex::Fork::Manager->new( max => $self->get_thread_count($task) ); - my $task_name = $task->name; my $all_servers = $task->server; for my $server (@$all_servers) { - my $forked_sub = sub { - Rex::Logger::init(); - Rex::Logger::info("Running task $task_name on $server"); - - my $run_task = $task->clone; - my $return_value = $run_task->run( - $server, - in_transaction => $self->{IN_TRANSACTION}, - params => $options{params}, - args => $options{args}, - ); - - Rex::Logger::debug("Destroying all cached os information"); - Rex::Logger::shutdown(); - - return $return_value; - }; + my $child_coderef = $self->build_child_coderef($task, $server, %options); if ( $self->{IN_TRANSACTION} ) { # Inside a transaction -- no forking and no chance to get zombies. # This only happens if someone calls do_task() from inside a transaction. - # Note the result is not appended to @SUMMARY. - $forked_sub->(); + $child_coderef->(); } else { # Not inside a transaction, so lets fork # Add $forked_sub to the fork queue - $fm->add( $forked_sub, $task, $server->to_s ); + $fm->add($child_coderef); } } @@ -333,6 +320,43 @@ sub run { return $ret; } +sub build_child_coderef { + my ($self, $task, $server, %options) = @_; + + return sub { + Rex::Logger::init(); + Rex::Logger::info("Running task " . $task->name . " on $server"); + + my $return_value = eval { + $task->clone->run( + $server, + in_transaction => $self->{IN_TRANSACTION}, + params => $options{params}, + args => $options{args}, + ); + }; + + if ($self->{IN_TRANSACTION}) { + die $@ if $@; + } + else { + my $exit_code = $@ ? ( ( $? >> 8 ) || 1 ) : 0; + + push @SUMMARY, { + task => $task->name, + server => $server->to_s, + exit_code => $exit_code, + }; + } + + Rex::Logger::debug("Destroying all cached os information"); + Rex::Logger::shutdown(); + + + return $return_value; + }; +} + sub modify { my ( $self, $type, $task, $code, $package, $file, $line ) = @_; @@ -389,7 +413,7 @@ sub is_transaction { sub get_exit_codes { my ($self) = @_; - return map { $_->{exit_code} } @Rex::Fork::Task::SUMMARY; + return map { $_->{exit_code} } @SUMMARY; } sub get_thread_count { @@ -409,6 +433,6 @@ sub get_thread_count { return 1; } -sub get_summary { @Rex::Fork::Task::SUMMARY } +sub get_summary { @SUMMARY } 1; diff --git a/lib/Rex/TaskList/Parallel_ForkManager.pm b/lib/Rex/TaskList/Parallel_ForkManager.pm index 861e8f4b0..24609a126 100644 --- a/lib/Rex/TaskList/Parallel_ForkManager.pm +++ b/lib/Rex/TaskList/Parallel_ForkManager.pm @@ -21,30 +21,16 @@ use Rex::Report; use Time::HiRes qw(time); BEGIN { - use Rex::Shared::Var; - share qw(@SUMMARY); - use Rex::Require; Parallel::ForkManager->require; } use base qw(Rex::TaskList::Base); -sub new { - my $that = shift; - my $proto = ref($that) || $that; - my $self = $proto->SUPER::new(@_); - - bless( $self, $proto ); - - return $self; -} - sub run { my ( $self, $task, %options ) = @_; - my $fm = Parallel::ForkManager->new( $self->get_thread_count($task) ); - my $task_name = $task->name; + my $fm = Parallel::ForkManager->new( $self->get_thread_count($task) ); my $all_servers = $task->server; $fm->run_on_finish( @@ -55,44 +41,18 @@ sub run { ); for my $server (@$all_servers) { - my $forked_sub = sub { - Rex::Logger::init(); - Rex::Logger::info("Running task $task_name on $server"); - - my $run_task = $task->clone; - my $return_value = $run_task->run( - $server, - in_transaction => $self->{IN_TRANSACTION}, - params => $options{params}, - args => $options{args}, - ); - - Rex::Logger::debug("Destroying all cached os information"); - Rex::Logger::shutdown(); - - return $return_value; - }; + my $child_coderef = $self->build_child_coderef($task, $server, %options); if ( $self->{IN_TRANSACTION} ) { # Inside a transaction -- no forking and no chance to get zombies. # This only happens if someone calls do_task() from inside a transaction. - # Note the result is not appended to @SUMMARY. - $forked_sub->(); + $child_coderef->(); } else { # Not inside a transaction, so lets fork $fm->start and next; - - eval { $forked_sub->() }; - my $exit_code = $@ ? ( ( $? >> 8 ) || 1 ) : 0; - push @SUMMARY, - { - task => $task_name, - server => $server->to_s, - exit_code => $exit_code, - }; - + $child_coderef->(); $fm->finish; } } @@ -104,21 +64,4 @@ sub run { return $ret; } -sub get_exit_codes { - my ($self) = @_; - return map { $_->{exit_code} } @SUMMARY; -} - -sub get_summary { @SUMMARY } - -sub set_in_transaction { - my ( $self, $val ) = @_; - $self->{IN_TRANSACTION} = $val; -} - -sub is_transaction { - my ($self) = @_; - return $self->{IN_TRANSACTION}; -} - 1; diff --git a/lib/Rex/Transaction.pm b/lib/Rex/Transaction.pm index ce175c99e..2228ee675 100644 --- a/lib/Rex/Transaction.pm +++ b/lib/Rex/Transaction.pm @@ -100,6 +100,8 @@ sub transaction(&) { Rex::pop_connection(); } + Rex::TaskList->create()->set_in_transaction(0); + die("Transaction failed. Rollback done."); } diff --git a/t/summary.t b/t/summary.t index 2434d5989..ed52b362c 100644 --- a/t/summary.t +++ b/t/summary.t @@ -93,22 +93,19 @@ sub create_tasks { sub test_summary { my (%expected) = @_; + my @expected_summary; $Rex::TaskList::task_list = undef; create_tasks(); - my @summary; - my @expected_summary; - my $test_description; - for my $task_name ( Rex::TaskList->create->get_tasks ) { Rex::TaskList->run($task_name); - @summary = Rex::TaskList->create->get_summary; + my @summary = Rex::TaskList->create->get_summary; push @expected_summary, $expected{$task_name}; - $test_description = + my $test_description = $expected{$task_name}->{exit_code} == 0 ? "$task_name succeeded" : "$task_name failed"; @@ -119,11 +116,7 @@ sub test_summary { my $distributor = Rex::Config->get_distributor; no warnings; - @Rex::Fork::Task::SUMMARY = () - if $distributor eq 'Base'; - - @Rex::TaskList::Parallel_ForkManager::SUMMARY = () - if $distributor eq 'Parallel_ForkManager'; + @Rex::TaskList::Base::SUMMARY = (); } sub parallel_forkmanager_not_installed {