lib/oe/utils: Add utils function for multiprocess execution

Our usage of multitprocessing is problematic. In particular, there is a bug
in python 2.7 multiprocessing where signals are not handled until command
completion instead of immediately.

This factors the multiprocess code into a function which is enhanced with
a workaround to ensure immediate signal handling and also better SIGINT
handling which should happen in the parent, not the children to ensure
clean exits. The workaround for the signals is being added to the core
bb.utils function so it can benefit all users.

package_manager is then converted to use the new code.

(From OE-Core rev: 72d153a3a90d31d9f4e41d77da24e44ccb33c56e)

Signed-off-by: Richard Purdie <richard.purdie@linuxfoundation.org>
This commit is contained in:
Richard Purdie 2014-08-21 21:46:28 +01:00
parent 6ab380ddde
commit be1b198076
2 changed files with 33 additions and 28 deletions

View File

@ -7,6 +7,7 @@ import multiprocessing
import re
import bb
import tempfile
import oe.utils
# this can be used by all PM backends to create the index files in parallel
@ -116,16 +117,7 @@ class RpmIndexer(Indexer):
bb.note("There are no packages in %s" % self.deploy_dir)
return
nproc = multiprocessing.cpu_count()
pool = bb.utils.multiprocessingpool(nproc)
results = list(pool.imap(create_index, index_cmds))
pool.close()
pool.join()
for result in results:
if result is not None:
return(result)
oe.utils.multiprocess_exec(index_cmds, create_index)
class OpkgIndexer(Indexer):
def write_index(self):
@ -161,15 +153,7 @@ class OpkgIndexer(Indexer):
bb.note("There are no packages in %s!" % self.deploy_dir)
return
nproc = multiprocessing.cpu_count()
pool = bb.utils.multiprocessingpool(nproc)
results = list(pool.imap(create_index, index_cmds))
pool.close()
pool.join()
for result in results:
if result is not None:
return(result)
oe.utils.multiprocess_exec(index_cmds, create_index)
class DpkgIndexer(Indexer):
@ -210,15 +194,7 @@ class DpkgIndexer(Indexer):
bb.note("There are no packages in %s" % self.deploy_dir)
return
nproc = multiprocessing.cpu_count()
pool = bb.utils.multiprocessingpool(nproc)
results = list(pool.imap(create_index, index_cmds))
pool.close()
pool.join()
for result in results:
if result is not None:
return(result)
oe.utils.multiprocess_exec(index_cmds, create_index)
class PkgsList(object):

View File

@ -151,3 +151,32 @@ def execute_pre_post_process(d, cmds):
if cmd != '':
bb.note("Executing %s ..." % cmd)
bb.build.exec_func(cmd, d)
def multiprocess_exec(commands, function):
import signal
import multiprocessing
if not commands:
return []
def init_worker():
signal.signal(signal.SIGINT, signal.SIG_IGN)
nproc = min(multiprocessing.cpu_count(), len(commands))
pool = bb.utils.multiprocessingpool(nproc, init_worker)
imap = pool.imap(function, commands)
try:
results = list(imap)
pool.close()
pool.join()
results = []
for result in results:
if result is not None:
results.append(result)
return results
except KeyboardInterrupt:
pool.terminate()
pool.join()
raise