cooker: use a pool, abort on first parse error

(Bitbake rev: 9caf65e79f95fe0045e727391e974c4c1e7411ff)

Signed-off-by: Chris Larson <chris_larson@mentor.com>
Signed-off-by: Richard Purdie <rpurdie@linux.intel.com>
This commit is contained in:
Chris Larson 2010-12-07 13:00:22 -05:00 committed by Richard Purdie
parent e5624a4ed3
commit ac4d926f41
1 changed files with 64 additions and 88 deletions

View File

@ -23,12 +23,13 @@
from __future__ import print_function
import sys, os, glob, os.path, re, time
import atexit
import itertools
import logging
import sre_constants
import threading
import multiprocessing
import signal
import atexit
import sre_constants
import threading
from cStringIO import StringIO
from contextlib import closing
import bb
@ -45,11 +46,6 @@ class MultipleMatches(Exception):
Exception raised when multiple file matches are found
"""
class ParsingErrorsFound(Exception):
"""
Exception raised when parsing errors are found
"""
class NothingToBuild(Exception):
"""
Exception raised when there is nothing to build
@ -976,6 +972,10 @@ class CookerExit(bb.event.Event):
def __init__(self):
bb.event.Event.__init__(self)
def parse_file(task):
filename, appends = task
return True, bb.cache.Cache.parse(filename, appends, parse_file.cfg)
class CookerParser(object):
def __init__(self, cooker, filelist, masked):
self.filelist = filelist
@ -993,113 +993,89 @@ class CookerParser(object):
self.total = len(filelist)
self.current = 0
self.bb_cache = None
self.task_queue = None
self.result_queue = None
self.fromcache = None
self.num_processes = int(self.cfgdata.getVar("BB_NUMBER_PARSE_THREADS", True) or
multiprocessing.cpu_count())
def launch_processes(self):
self.task_queue = multiprocessing.Queue()
self.result_queue = multiprocessing.Queue()
self.bb_cache = bb.cache.Cache(self.cfgdata)
self.fromcache = []
self.willparse = []
for filename in self.filelist:
appends = self.cooker.get_file_appends(filename)
if not self.bb_cache.cacheValid(filename):
self.task_queue.put((filename, appends))
self.willparse.append((filename, appends))
else:
self.fromcache.append((filename, appends))
self.toparse = self.total - len(self.fromcache)
self.progress_chunk = max(self.toparse / 100, 1)
def worker(input, output, cfgdata):
signal.signal(signal.SIGINT, signal.SIG_IGN)
for filename, appends in iter(input.get, 'STOP'):
try:
infos = bb.cache.Cache.parse(filename, appends, cfgdata)
except bb.parse.ParseError as exc:
output.put(exc)
else:
output.put(infos)
self.start()
self.processes = []
for i in xrange(self.num_processes):
process = multiprocessing.Process(target=worker,
args=(self.task_queue,
self.result_queue,
self.cfgdata))
process.start()
self.processes.append(process)
def start(self):
def init(cfg):
signal.signal(signal.SIGINT, signal.SIG_IGN)
parse_file.cfg = cfg
bb.event.fire(bb.event.ParseStarted(self.toparse), self.cfgdata)
self.pool = multiprocessing.Pool(self.num_processes, init, [self.cfgdata])
parsed = self.pool.imap(parse_file, self.willparse)
self.pool.close()
self.results = itertools.chain(self.load_cached(), parsed)
def shutdown(self, clean=True):
self.result_queue.close()
for process in self.processes:
if clean:
self.task_queue.put('STOP')
else:
process.terminate()
self.task_queue.close()
for process in self.processes:
process.join()
sync = threading.Thread(target=self.bb_cache.sync)
sync.start()
atexit.register(lambda: sync.join())
codesync = threading.Thread(target=bb.codeparser.parser_cache_save(self.cooker.configuration.data))
codesync.start()
atexit.register(lambda: codesync.join())
if self.error > 0:
raise ParsingErrorsFound()
def parse_next(self):
if self.current >= self.total:
if clean:
event = bb.event.ParseCompleted(self.cached, self.parsed,
self.skipped, self.masked,
self.virtuals, self.error,
self.total)
bb.event.fire(event, self.cfgdata)
else:
self.pool.terminate()
self.pool.join()
sync = threading.Thread(target=self.bb_cache.sync)
sync.start()
atexit.register(lambda: sync.join())
codesync = threading.Thread(target=bb.codeparser.parser_cache_save(self.cooker.configuration.data))
codesync.start()
atexit.register(lambda: codesync.join())
def load_cached(self):
for filename, appends in self.fromcache:
cached, infos = self.bb_cache.load(filename, appends, self.cfgdata)
yield not cached, infos
def parse_next(self):
try:
parsed, result = self.results.next()
except StopIteration:
self.shutdown()
return False
elif not self.bb_cache:
self.bb_cache = bb.cache.Cache(self.cfgdata)
self.launch_processes()
bb.event.fire(bb.event.ParseStarted(self.toparse), self.cfgdata)
return True
try:
if self.result_queue.empty() and self.fromcache:
filename, appends = self.fromcache.pop()
_, result = self.bb_cache.load(filename, appends, self.cfgdata)
parsed = False
self.cached += 1
else:
result = self.result_queue.get()
if isinstance(result, Exception):
raise result
parsed = True
self.parsed += 1
if self.parsed % self.progress_chunk == 0:
bb.event.fire(bb.event.ParseProgress(self.parsed),
self.cfgdata)
except KeyboardInterrupt:
self.shutdown(clean=False)
raise
except Exception as e:
self.error += 1
parselog.critical(str(e))
else:
self.virtuals += len(result)
for virtualfn, info in result:
if info.skipped:
self.skipped += 1
else:
self.bb_cache.add_info(virtualfn, info, self.cooker.status,
parsed=parsed)
except Exception as exc:
self.shutdown(clean=False)
sys.exit(1)
self.current += 1
self.virtuals += len(result)
if parsed:
self.parsed += 1
if self.parsed % self.progress_chunk == 0:
bb.event.fire(bb.event.ParseProgress(self.parsed),
self.cfgdata)
else:
self.cached += 1
for virtualfn, info in result:
if info.skipped:
self.skipped += 1
else:
self.bb_cache.add_info(virtualfn, info, self.cooker.status,
parsed=parsed)
return True
def reparse(self, filename):