bitbake: cooker: parse using bb.compat.Pool

(Bitbake rev: 8af519a49a3374bd9004864ef31ca8aa328e9f34)

Signed-off-by: Christopher Larson <chris_larson@mentor.com>
Signed-off-by: Richard Purdie <richard.purdie@linuxfoundation.org>
This commit is contained in:
Christopher Larson 2013-02-12 12:28:47 -05:00 committed by Richard Purdie
parent dde7a48135
commit 325410e2b2
1 changed files with 26 additions and 133 deletions

View File

@ -34,7 +34,7 @@ from cStringIO import StringIO
from contextlib import closing
from functools import wraps
from collections import defaultdict
import bb, bb.exceptions, bb.command
import bb, bb.exceptions, bb.command, bb.compat
from bb import utils, data, parse, event, cache, providers, taskdata, runqueue
import Queue
import prserv.serv
@ -1556,87 +1556,19 @@ class ParsingFailure(Exception):
self.recipe = recipe
Exception.__init__(self, realexception, recipe)
class Feeder(multiprocessing.Process):
def __init__(self, jobs, to_parsers, quit):
self.quit = quit
self.jobs = jobs
self.to_parsers = to_parsers
multiprocessing.Process.__init__(self)
def run(self):
while True:
try:
quit = self.quit.get_nowait()
except Queue.Empty:
pass
else:
if quit == 'cancel':
self.to_parsers.cancel_join_thread()
break
try:
job = self.jobs.pop()
except IndexError:
break
try:
self.to_parsers.put(job, timeout=0.5)
except Queue.Full:
self.jobs.insert(0, job)
continue
class Parser(multiprocessing.Process):
def __init__(self, jobs, results, quit, init):
self.jobs = jobs
self.results = results
self.quit = quit
self.init = init
multiprocessing.Process.__init__(self)
def run(self):
if self.init:
self.init()
pending = []
while True:
try:
self.quit.get_nowait()
except Queue.Empty:
pass
else:
self.results.cancel_join_thread()
break
if pending:
result = pending.pop()
else:
try:
job = self.jobs.get(timeout=0.25)
except Queue.Empty:
continue
if job is None:
break
result = self.parse(*job)
try:
self.results.put(result, timeout=0.25)
except Queue.Full:
pending.append(result)
def parse(self, filename, appends, caches_array):
try:
return True, bb.cache.Cache.parse(filename, appends, self.cfg, caches_array)
except Exception as exc:
tb = sys.exc_info()[2]
exc.recipe = filename
exc.traceback = list(bb.exceptions.extract_traceback(tb, context=3))
return True, exc
# Need to turn BaseExceptions into Exceptions here so we gracefully shutdown
# and for example a worker thread doesn't just exit on its own in response to
# a SystemExit event for example.
except BaseException as exc:
return True, ParsingFailure(exc, filename)
def parse_file((filename, appends, caches_array)):
try:
return True, bb.cache.Cache.parse(filename, appends, parse_file.cfg, caches_array)
except Exception as exc:
tb = sys.exc_info()[2]
exc.recipe = filename
exc.traceback = list(bb.exceptions.extract_traceback(tb, context=3))
return True, exc
# Need to turn BaseExceptions into Exceptions here so we gracefully shutdown
# and for example a worker thread doesn't just exit on its own in response to
# a SystemExit event for example.
except BaseException as exc:
return True, ParsingFailure(exc, filename)
class CookerParser(object):
def __init__(self, cooker, filelist, masked):
@ -1670,32 +1602,25 @@ class CookerParser(object):
self.fromcache.append((filename, appends))
self.toparse = self.total - len(self.fromcache)
self.progress_chunk = max(self.toparse / 100, 1)
self.chunk = int(self.cfgdata.getVar("BB_PARSE_CHUNK", True) or 1)
self.start()
self.haveshutdown = False
def start(self):
self.results = self.load_cached()
self.processes = []
if self.toparse:
def process_init():
parse_file.cfg = self.cfgdata
multiprocessing.util.Finalize(None, bb.codeparser.parser_cache_save, args=(parse_file.cfg,), exitpriority=1)
multiprocessing.util.Finalize(None, bb.fetch.fetcher_parse_save, args=(parse_file.cfg,), exitpriority=1)
bb.event.fire(bb.event.ParseStarted(self.toparse), self.cfgdata)
def init():
Parser.cfg = self.cfgdata
multiprocessing.util.Finalize(None, bb.codeparser.parser_cache_save, args=(self.cfgdata,), exitpriority=1)
multiprocessing.util.Finalize(None, bb.fetch.fetcher_parse_save, args=(self.cfgdata,), exitpriority=1)
self.feeder_quit = multiprocessing.Queue(maxsize=1)
self.parser_quit = multiprocessing.Queue(maxsize=self.num_processes)
self.jobs = multiprocessing.Queue(maxsize=self.num_processes)
self.result_queue = multiprocessing.Queue()
self.feeder = Feeder(self.willparse, self.jobs, self.feeder_quit)
self.feeder.start()
for i in range(0, self.num_processes):
parser = Parser(self.jobs, self.result_queue, self.parser_quit, init)
parser.start()
self.processes.append(parser)
self.results = itertools.chain(self.results, self.parse_generator())
self.pool = bb.compat.Pool(self.num_processes, process_init)
parsed = self.pool.imap_unordered(parse_file, self.willparse, self.chunk)
self.pool.close()
self.results = itertools.chain(self.results, parsed)
def shutdown(self, clean=True, force=False):
if not self.toparse:
@ -1711,25 +1636,9 @@ class CookerParser(object):
self.total)
bb.event.fire(event, self.cfgdata)
self.feeder_quit.put(None)
for process in self.processes:
self.jobs.put(None)
else:
self.feeder_quit.put('cancel')
self.parser_quit.cancel_join_thread()
for process in self.processes:
self.parser_quit.put(None)
self.jobs.cancel_join_thread()
for process in self.processes:
if force:
process.join(.1)
process.terminate()
else:
process.join()
self.feeder.join()
self.pool.terminate()
self.pool.join()
sync = threading.Thread(target=self.bb_cache.sync)
sync.start()
@ -1742,22 +1651,6 @@ class CookerParser(object):
cached, infos = self.bb_cache.load(filename, appends, self.cfgdata)
yield not cached, infos
def parse_generator(self):
while True:
if self.parsed >= self.toparse:
break
try:
result = self.result_queue.get(timeout=0.25)
except Queue.Empty:
pass
else:
value = result[1]
if isinstance(value, BaseException):
raise value
else:
yield result
def parse_next(self):
result = []
parsed = None