bitbake: Revert "cooker: parse using bb.compat.Pool"

Reverting the pool changes, terminate does not work reliably on
bb.compat.Pool :(

[YOCTO #3978]

This reverts commit 8af519a49a3374bd9004864ef31ca8aa328e9f34.

Signed-off-by: Richard Purdie <richard.purdie@linuxfoundation.org>
This commit is contained in:
Richard Purdie 2013-03-06 15:32:35 +00:00
parent ed76a48e68
commit 164a4cb2fc
1 changed files with 133 additions and 26 deletions

View File

@ -34,7 +34,7 @@ from cStringIO import StringIO
from contextlib import closing from contextlib import closing
from functools import wraps from functools import wraps
from collections import defaultdict from collections import defaultdict
import bb, bb.exceptions, bb.command, bb.compat import bb, bb.exceptions, bb.command
from bb import utils, data, parse, event, cache, providers, taskdata, runqueue from bb import utils, data, parse, event, cache, providers, taskdata, runqueue
import Queue import Queue
import prserv.serv import prserv.serv
@ -1556,19 +1556,87 @@ class ParsingFailure(Exception):
self.recipe = recipe self.recipe = recipe
Exception.__init__(self, realexception, recipe) Exception.__init__(self, realexception, recipe)
def parse_file((filename, appends, caches_array)): class Feeder(multiprocessing.Process):
try: def __init__(self, jobs, to_parsers, quit):
return True, bb.cache.Cache.parse(filename, appends, parse_file.cfg, caches_array) self.quit = quit
except Exception as exc: self.jobs = jobs
tb = sys.exc_info()[2] self.to_parsers = to_parsers
exc.recipe = filename multiprocessing.Process.__init__(self)
exc.traceback = list(bb.exceptions.extract_traceback(tb, context=3))
return True, exc def run(self):
# Need to turn BaseExceptions into Exceptions here so we gracefully shutdown while True:
# and for example a worker thread doesn't just exit on its own in response to try:
# a SystemExit event for example. quit = self.quit.get_nowait()
except BaseException as exc: except Queue.Empty:
return True, ParsingFailure(exc, filename) pass
else:
if quit == 'cancel':
self.to_parsers.cancel_join_thread()
break
try:
job = self.jobs.pop()
except IndexError:
break
try:
self.to_parsers.put(job, timeout=0.5)
except Queue.Full:
self.jobs.insert(0, job)
continue
class Parser(multiprocessing.Process):
def __init__(self, jobs, results, quit, init):
self.jobs = jobs
self.results = results
self.quit = quit
self.init = init
multiprocessing.Process.__init__(self)
def run(self):
if self.init:
self.init()
pending = []
while True:
try:
self.quit.get_nowait()
except Queue.Empty:
pass
else:
self.results.cancel_join_thread()
break
if pending:
result = pending.pop()
else:
try:
job = self.jobs.get(timeout=0.25)
except Queue.Empty:
continue
if job is None:
break
result = self.parse(*job)
try:
self.results.put(result, timeout=0.25)
except Queue.Full:
pending.append(result)
def parse(self, filename, appends, caches_array):
try:
return True, bb.cache.Cache.parse(filename, appends, self.cfg, caches_array)
except Exception as exc:
tb = sys.exc_info()[2]
exc.recipe = filename
exc.traceback = list(bb.exceptions.extract_traceback(tb, context=3))
return True, exc
# Need to turn BaseExceptions into Exceptions here so we gracefully shutdown
# and for example a worker thread doesn't just exit on its own in response to
# a SystemExit event for example.
except BaseException as exc:
return True, ParsingFailure(exc, filename)
class CookerParser(object): class CookerParser(object):
def __init__(self, cooker, filelist, masked): def __init__(self, cooker, filelist, masked):
@ -1602,25 +1670,32 @@ class CookerParser(object):
self.fromcache.append((filename, appends)) self.fromcache.append((filename, appends))
self.toparse = self.total - len(self.fromcache) self.toparse = self.total - len(self.fromcache)
self.progress_chunk = max(self.toparse / 100, 1) self.progress_chunk = max(self.toparse / 100, 1)
self.chunk = int(self.cfgdata.getVar("BB_PARSE_CHUNK", True) or 1)
self.start() self.start()
self.haveshutdown = False self.haveshutdown = False
def start(self): def start(self):
self.results = self.load_cached() self.results = self.load_cached()
self.processes = []
if self.toparse: if self.toparse:
def process_init():
parse_file.cfg = self.cfgdata
multiprocessing.util.Finalize(None, bb.codeparser.parser_cache_save, args=(parse_file.cfg,), exitpriority=1)
multiprocessing.util.Finalize(None, bb.fetch.fetcher_parse_save, args=(parse_file.cfg,), exitpriority=1)
bb.event.fire(bb.event.ParseStarted(self.toparse), self.cfgdata) bb.event.fire(bb.event.ParseStarted(self.toparse), self.cfgdata)
def init():
Parser.cfg = self.cfgdata
multiprocessing.util.Finalize(None, bb.codeparser.parser_cache_save, args=(self.cfgdata,), exitpriority=1)
multiprocessing.util.Finalize(None, bb.fetch.fetcher_parse_save, args=(self.cfgdata,), exitpriority=1)
self.pool = bb.compat.Pool(self.num_processes, process_init) self.feeder_quit = multiprocessing.Queue(maxsize=1)
parsed = self.pool.imap_unordered(parse_file, self.willparse, self.chunk) self.parser_quit = multiprocessing.Queue(maxsize=self.num_processes)
self.pool.close() self.jobs = multiprocessing.Queue(maxsize=self.num_processes)
self.results = itertools.chain(self.results, parsed) self.result_queue = multiprocessing.Queue()
self.feeder = Feeder(self.willparse, self.jobs, self.feeder_quit)
self.feeder.start()
for i in range(0, self.num_processes):
parser = Parser(self.jobs, self.result_queue, self.parser_quit, init)
parser.start()
self.processes.append(parser)
self.results = itertools.chain(self.results, self.parse_generator())
def shutdown(self, clean=True, force=False): def shutdown(self, clean=True, force=False):
if not self.toparse: if not self.toparse:
@ -1636,9 +1711,25 @@ class CookerParser(object):
self.total) self.total)
bb.event.fire(event, self.cfgdata) bb.event.fire(event, self.cfgdata)
self.feeder_quit.put(None)
for process in self.processes:
self.jobs.put(None)
else: else:
self.pool.terminate() self.feeder_quit.put('cancel')
self.pool.join()
self.parser_quit.cancel_join_thread()
for process in self.processes:
self.parser_quit.put(None)
self.jobs.cancel_join_thread()
for process in self.processes:
if force:
process.join(.1)
process.terminate()
else:
process.join()
self.feeder.join()
sync = threading.Thread(target=self.bb_cache.sync) sync = threading.Thread(target=self.bb_cache.sync)
sync.start() sync.start()
@ -1651,6 +1742,22 @@ class CookerParser(object):
cached, infos = self.bb_cache.load(filename, appends, self.cfgdata) cached, infos = self.bb_cache.load(filename, appends, self.cfgdata)
yield not cached, infos yield not cached, infos
def parse_generator(self):
while True:
if self.parsed >= self.toparse:
break
try:
result = self.result_queue.get(timeout=0.25)
except Queue.Empty:
pass
else:
value = result[1]
if isinstance(value, BaseException):
raise value
else:
yield result
def parse_next(self): def parse_next(self):
result = [] result = []
parsed = None parsed = None