Native parallel PHP job queue
To make use of multiple cores for some rather long processing operations I needed a way to fork multiple workers from a single PHP script multiple times lately. So I created a small project on github which implements this in a way, so that it should reusable by anybody. This is far from being rocket science, but still might be useful to someone out there.
Native job queue
I also implemented a ShellJobProvider (which implements JobProvider), which is constructed from an array of shell commands, which then are executed in parallel. A simple working example:
<?php
require 'njq/environment.php';
$executor = new \njq\Executor();
$executor->run(
new \njq\ShellJobProvider( array(
'echo 1 >> test',
'echo 2 >> test',
'echo 3 >> test',
'echo 4 >> test',
'echo 5 >> test',
) ),
4
);
?>The file test will then contain something like (the order might vary):
5
4
3
2
1The 4 (second parameter of \njq\Executor::run) defines the number of parallel processes to spawn. This should not exceed the number of available cores in the most cases.
Requirements
The job queue requires PHP 5.3 and the PHP PCNTL extension.
Get it
I like github for dropping projects I do not want to maintain very actively, so this small piece of code is also hosted on github: http://github.com/kore/njq - happy forking. It, of course, has documentation and tests - as always.
If you liked this blog post, or learned something please consider using flattr to contribute back: .
Comments
Fields with bold names are mandatory.
Lars Johansson at Sat, 08 May 2010 18:18:55 +0200
Hey Core,
Link to commentI like your way of pcntl_fork a lot, nice clean code.
I do a lot PHP parallel processing mostly in 'map and reduce' stile. Oddly enough in a job scheduler written in PHP. Below is an example of a BOM explosion job that fork in two levels. The inner level is a map and reduce fork. And it really pays off. The execution time is 'proportional' to the amount of processes (up to about 12 processes for this job in my machine).
<?xml version='1.0' standalone='yes'?>
<schedule name='bomit' mustcomplete='yes' logresult='yes' period='day' notify='admfail.xml' logmsg='Explode BOMs'>
<variant><name>DBPFX</name><validate>DB3,DB9</validate><default>DB3</default></variant>
<tag><name>C_DB</name>
<function>if ("@DBPFX"=='DB3') return 'ACTADW'; elseif ("@DBPFX"=='DB9') return 'CPDDW'; else return FALSE;</function>
</tag>
<job name='Design_BOMS' type='dummy' parallel='yes'> <!-- Top level job, may run in parallel with adjacent top jobs -->
<tag><name>BOMTABLE</name><value>@DBPFX_BOM_BASIC</value></tag>
<tag><name>FILE</name><value>@BOMTABLE.TXT</value></tag>
<job name='createTopNodes' type='sql' logmsg='Create a driver array for job explode'>
<tag><name>TOPTABLE</name><value>@BOMTABLE_TOP</value></tag>
<sql name='create_@TOPTABLE'>
USE @C_DB;
DROP TABLE IF EXISTS @TOPTABLE;
create table @TOPTABLE
(select P_PLANT, P_MATNR, MIN(ALT_BOM) AS ALT_BOM, BOM_USE from @BOMTABLE
group by P_PLANT, P_MATNR, BOM_USE
order by P_PLANT, P_MATNR,ALT_BOM,BOM_USE);
</sql>
<sql name='create_driver'>
USE @C_DB;
select * from @TOPTABLE;
</sql>
</job>
<job name='explode' type='script' pgm='bomitBasic.php' db='@C_DB' filestem='@FILE' bomtable='@BOMTABLE'>
<forevery name='@J_createTopNodes/driver0' chunksize='1000' parallel='6' rowdirectory='no'>
<!-- run the job once for each 1000 elements in array but no more than 6 in parallel -->
<exit pgm='reduceDriver_default.php' append='@FILE' deletefiles='yes'/>
</forevery>
</job>
<job name='loadit' type='sql' logmsg='Load the result into MySQL'>
<tag><name>BOMTREETABLE</name><value>@BOMTABLE_TREE</value></tag>
<tag><name>INFILE</name><value>@J_explode/@FILE</value></tag>
<sql name='load_tree_table'>
USE @C_DB;
LOAD DATA LOCAL INFILE '@INFILE'
REPLACE
INTO TABLE @BOMTREETABLE
FIELDS
TERMINATED BY ';'
IGNORE 0 LINES
;
</sql>
</job>
</job>
</schedule>
Hyh at Sun, 09 May 2010 08:51:24 +0200
Where you are using multiple core?
Link to commentShyam at Fri, 02 Jul 2010 10:35:07 +0200
Thank you, this saved me a lot of trouble, very nicely done.
Link to comment