cooker: use futures rather than a multiprocessing pool

This avoids some silent parser hangs we were seeing which were
near impossible to debug as no user feedback was given.

[RP: Tweak commit message]
(Bitbake rev: d104f29871c04a5a36600a35b2568b49e5b21ca0)

Signed-off-by: Christopher Larson <chris_larson@mentor.com>
Signed-off-by: Richard Purdie <richard.purdie@linuxfoundation.org>
This commit is contained in:
Christopher Larson 2012-01-08 23:21:27 -06:00 committed by Richard Purdie
parent 299fa3489b
commit 0779a90e2a
1 changed files with 18 additions and 11 deletions

View File

@ -32,6 +32,7 @@ import sre_constants
import threading
from cStringIO import StringIO
from contextlib import closing
from concurrent import futures
from functools import wraps
from collections import defaultdict
import bb, bb.exceptions, bb.command
@ -1462,20 +1463,16 @@ class CookerParser(object):
self.start()
def start(self):
def init(cfg):
parse_file.cfg = cfg
multiprocessing.util.Finalize(None, bb.codeparser.parser_cache_save, args=(self.cooker.configuration.data, ), exitpriority=1)
self.results = self.load_cached()
if self.toparse:
bb.event.fire(bb.event.ParseStarted(self.toparse), self.cfgdata)
self.pool = multiprocessing.Pool(self.num_processes, init, [self.cfgdata])
parsed = self.pool.imap(parse_file, self.willparse)
self.pool.close()
self.results = itertools.chain(self.results, parsed)
parse_file.cfg = self.cfgdata
multiprocessing.util.Finalize(None, bb.codeparser.parser_cache_save, args=(self.cfgdata,), exitpriority=1)
self.executor = futures.ProcessPoolExecutor(max_workers=self.num_processes)
self.futures = dict((self.executor.submit(parse_file, task), task) for task in self.willparse)
self.results = itertools.chain(self.results, self.parse_gen())
def shutdown(self, clean=True):
if not self.toparse:
@ -1488,8 +1485,9 @@ class CookerParser(object):
self.total)
bb.event.fire(event, self.cfgdata)
else:
self.pool.terminate()
self.pool.join()
for future in self.futures:
future.cancel()
self.executor.shutdown()
sync = threading.Thread(target=self.bb_cache.sync)
sync.start()
@ -1501,6 +1499,15 @@ class CookerParser(object):
cached, infos = self.bb_cache.load(filename, appends, self.cfgdata)
yield not cached, infos
def parse_gen(self):
for future in futures.as_completed(self.futures):
task = self.futures[future]
exc = future.exception()
if exc:
raise exc
else:
yield future.result()
def parse_next(self):
try:
parsed, result = self.results.next()