Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,17 @@ HMCI supports inserting imported data into:
* Files
- CSV

## Parallelization

HMCI supports parallelization of imports, whereby each `process_item()` on the importer will run in a separate process. This is achieved by using the `pcntl_fork()` function. However, this is opt-in per importer as implementors must be careful to ensure that the `process_item()` does not rely on data populated by other calls to process_item(). For example, if `process_item()` adds to a global array, or array on the importer for a customer `$import_map`, then this data will not be available to other processes.

If your importer is thread-safe in this way, you should implement the `HMCI\Iterator\Thread_Sage` interface.

When importers are in in parallel, `wp_defer_term_counting()` is called, so `wp term count <taxonomy>` will need to be run after the import.

Pass `--threads=<number>` to the CLI command to specify the number of threads to use. For maximum performance, this should be atleast set to the number of CPU cores on the server. In many cases,
`process_item()` will be I/O bound so using a higher number of threads than cores can still have a positive impact.

## Migrating From Version 1

In Version 2 we changed the way canonical IDs are stored. This means that you will need to migrate your existing data to the new format, if you are planning to resume / to delta imports with data that was imported under Version 1.
Expand Down
4 changes: 4 additions & 0 deletions inc/classes/cli/class-hmci.php
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,11 @@ class HMCI extends \WP_CLI_Command {
* [--thread_id=<id>]
* : Thread ID to keep a unique progress value per each, when threading.
*
* [--items_per_loop=<number>]
* : Number of items to be processed on a single loop, larger are more efficient but more memory intensive.
*
* [--threads=<number>]
* : Number of threads to use for processing, if the importer is thread safe.
*
* @subcommand import
*/
Expand Down
95 changes: 91 additions & 4 deletions inc/classes/iterator/class-base.php
Original file line number Diff line number Diff line change
Expand Up @@ -77,14 +77,96 @@ public function iterate_all() {
* @param $items
*/
public function iterate_items( $items ) {
if ( $this instanceof Thread_Safe ) {
return $this->iterate_items_with_fork( $items );
} else {
foreach ( $items as $item ) {
$r = $this->iterate_item( $item );
if ( is_wp_error( $r ) ) {
$this->debug( $r );
}
}
}
}

foreach ( $items as $item ) {
/**
* Process the items in parallel with forked processes.
*
* This should only be used if the iterator is thread safe (implements Thread_Safe interface)
* .
* @param mixed $items
* @return void
*/
protected function iterate_items_with_fork( $items ) {
// Safe tio defer term counting no matter what, as items will run in parallel.
wp_defer_term_counting( true );

$child_processes = [];
$max_processes = $this->get_args()['threads'] ?? 24;

$to_process = array_keys( array_values( $items ) );
while ( true ) {
foreach ( $child_processes as $item_number => $pid ) {
$res = pcntl_waitpid( $pid, $status, WNOHANG );
if ($res == -1) {
// Error occurred
\WP_CLI::error( 'An error occurred while checking child process status.' );
} elseif ( $res > 0 ) {
if ( pcntl_wifexited( $status ) ) {
$exist_status = pcntl_wexitstatus( $status );
unset( $child_processes[ $item_number ] );
if ( $exist_status === 0 ) {
//\WP_CLI::line( "Parent: Child process exited with status $exist_status." );
} else {
\WP_CLI::line( "Parent: Child process exited with filed status $exist_status, putting item back on job array." );
$to_process[] = $item_number;
}
} else {
\WP_CLI::error( "An error occurred while checking child process status." );
}
} else {
// Child process is still running
}
}

$r = $this->iterate_item( $item );
if ( empty( $to_process ) && count( $child_processes ) === 0 ) {
break;
}

if ( is_wp_error( $r ) ) {
$this->debug( $r );
if ( $to_process && count( $child_processes ) < $max_processes ) {
$process_item_number = reset( $to_process );
unset ( $to_process[0] );
$to_process = array_values( $to_process );

$pid = pcntl_fork();
if ( $pid === -1) {
\WP_CLI::error( 'Failed to fork child process' );
} else if ( $pid === 0) {
global $wp_object_cache;
global $wpdb;
foreach ( $wpdb->dbhs as $dbh => $connection ) {
$wpdb->disconnect( $dbh );
}
$wpdb->persistent = false;
$wp_object_cache = new \WP_Object_Cache();
if ( method_exists( $this, 'startup_thread' ) ) {
call_user_func( [ $this, 'startup_thread' ] );
}

$this->iterate_item( $items[ $process_item_number ] );
foreach ( $wpdb->dbhs as $dbh => $connection ) {
$wpdb->disconnect( $dbh );
}

if ( method_exists( $this, 'shutdown_thread' ) ) {
call_user_func( [ $this, 'shutdown_thread' ] );
}
exit;
} else {
$child_processes[ $process_item_number ] = $pid;
}
}
usleep( 1_000 );
}
}

Expand Down Expand Up @@ -205,6 +287,11 @@ public static function get_args() {
'type' => 'bool',
'description' => __( 'Attempt to resume script (if there was a failure during last execution)', 'hmci' ),
],
'threads' => [
'default' => 1,
'type' => 'numeric',
'description' => __( 'Number of threads to use for processing, if the iterator is thread safe', 'hmci' ),
],
];

return array_merge( $global_args, static::get_iterator_args(), static::get_custom_args(), static::get_importer_args(), static::get_validator_args() );
Expand Down
10 changes: 10 additions & 0 deletions inc/classes/iterator/class-thread-safe-interface.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
<?php

namespace HMCI\Iterator;

/**
* Interface to indicate an importer is Thread Safe.
*/
interface Thread_Safe {

}
15 changes: 15 additions & 0 deletions inc/classes/iterator/db/class-base.php
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,21 @@ public function set_db() {
$this->database_connection = new \wpdb( $this->args['db_user'], $this->args['db_pass'], $this->args['db_name'], $this->args['db_host'] );
}

/**
* In cases when the importer supports threading, this method will be called before the thread is started
* to set up a new DB connection.
*/
public function startup_thread() {
$this->set_db();
}

/**
* In cases when the importer supports threading, this method will be called after the thread is finished
* to close the DB connection.
*/
public function shutdown_thread() {
$this->database_connection->close();
}
/**
* Get iterator argument definitions
*
Expand Down
1 change: 1 addition & 0 deletions plugin.php
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
require_once( __DIR__ . '/inc/utils/namespace.php' );

require_once( __DIR__ . '/inc/classes/iterator/class-base-interface.php' );
require_once( __DIR__ . '/inc/classes/iterator/class-thread-safe-interface.php' );
require_once( __DIR__ . '/inc/classes/iterator/class-base.php' );

require_once( __DIR__ . '/inc/classes/iterator/db/class-base.php' );
Expand Down