Implement parallel parsing support

This utilizes python's multiprocessing module.  The default number of threads
to be used is the same as the number of available processor cores, however,
you can manually set this with the BB_NUMBER_PARSE_THREADS variable.

(Bitbake rev: c7b3ec819549e51e438d293969e205883fee725f)

Signed-off-by: Chris Larson <chris_larson@mentor.com>
Signed-off-by: Richard Purdie <rpurdie@linux.intel.com>
This commit is contained in:
Chris Larson 2010-11-18 20:21:54 -07:00 committed by Richard Purdie
parent 570bec37a8
commit 32ea766871
3 changed files with 157 additions and 77 deletions

View File

@ -232,48 +232,57 @@ class Cache(object):
bb_data = cls.load_bbfile(fn, appends, cfgData)
return bb_data[virtual]
@classmethod
def parse(cls, filename, appends, configdata):
"""Parse the specified filename, returning the recipe information"""
infos = []
datastores = cls.load_bbfile(filename, appends, configdata)
depends = set()
for variant, data in sorted(datastores.iteritems(),
key=lambda i: i[0],
reverse=True):
virtualfn = cls.realfn2virtual(filename, variant)
depends |= (data.getVar("__depends", False) or set())
if depends and not variant:
data.setVar("__depends", depends)
info = RecipeInfo.from_metadata(filename, data)
infos.append((virtualfn, info))
return infos
def load(self, filename, appends, configdata):
"""Obtain the recipe information for the specified filename,
using cached values if available, otherwise parsing.
Note that if it does parse to obtain the info, it will not
automatically add the information to the cache or to your
CacheData. Use the add or add_info method to do so after
running this, or use loadData instead."""
cached = self.cacheValid(filename)
if cached:
infos = []
info = self.depends_cache[filename]
for variant in info.variants:
virtualfn = self.realfn2virtual(filename, variant)
infos.append((virtualfn, self.depends_cache[virtualfn]))
else:
logger.debug(1, "Parsing %s", filename)
return self.parse(filename, appends, configdata)
return cached, infos
def loadData(self, fn, appends, cfgData, cacheData):
"""
Load a subset of data for fn.
If the cached data is valid we do nothing,
To do this, we need to parse the file and set the system
to record the variables accessed.
Return the cache status and whether the file was skipped when parsed
"""
"""Load the recipe info for the specified filename,
parsing and adding to the cache if necessary, and adding
the recipe information to the supplied CacheData instance."""
skipped, virtuals = 0, 0
if fn not in self.checked:
self.cacheValidUpdate(fn)
cached = self.cacheValid(fn)
if not cached:
logger.debug(1, "Parsing %s", fn)
datastores = self.load_bbfile(fn, appends, cfgData)
depends = set()
for variant, data in sorted(datastores.iteritems(),
key=lambda i: i[0],
reverse=True):
virtualfn = self.realfn2virtual(fn, variant)
depends |= (data.getVar("__depends", False) or set())
if depends and not variant:
data.setVar("__depends", depends)
info = RecipeInfo.from_metadata(fn, data)
if not info.nocache:
# The recipe was parsed, and is not marked as being
# uncacheable, so we need to ensure that we write out the
# new cache data.
self.cacheclean = False
self.depends_cache[virtualfn] = info
info = self.depends_cache[fn]
for variant in info.variants:
virtualfn = self.realfn2virtual(fn, variant)
vinfo = self.depends_cache[virtualfn]
if vinfo.skipped:
cached, infos = self.load(fn, appends, cfgData)
for virtualfn, info in infos:
if info.skipped:
logger.debug(1, "Skipping %s", virtualfn)
skipped += 1
else:
cacheData.add_from_recipeinfo(virtualfn, vinfo)
self.add_info(virtualfn, info, cacheData, not cached)
virtuals += 1
return cached, skipped, virtuals
@ -283,6 +292,9 @@ class Cache(object):
Is the cache valid for fn?
Fast version, no timestamps checked.
"""
if fn not in self.checked:
self.cacheValidUpdate(fn)
# Is cache enabled?
if not self.has_cache:
return False
@ -412,14 +424,22 @@ class Cache(object):
def mtime(cachefile):
return bb.parse.cached_mtime_noerror(cachefile)
def add(self, file_name, data, cacheData):
def add_info(self, filename, info, cacheData, parsed=None):
self.depends_cache[filename] = info
cacheData.add_from_recipeinfo(filename, info)
if parsed and not info.nocache:
# The recipe was parsed, and is not marked as being
# uncacheable, so we need to ensure that we write out the
# new cache data.
self.cacheclean = False
def add(self, file_name, data, cacheData, parsed=None):
"""
Save data we need into the cache
"""
realfn = self.virtualfn2realfn(file_name)[0]
info = RecipeInfo.from_metadata(realfn, data)
self.depends_cache[file_name] = info
cacheData.add_from_recipeinfo(file_name, info)
self.add_info(file_name, info, cacheData, parsed)
@staticmethod
def load_bbfile(bbfile, appends, config):

View File

@ -25,6 +25,8 @@ from __future__ import print_function
import sys, os, glob, os.path, re, time
import logging
import sre_constants
import multiprocessing
import signal
from cStringIO import StringIO
from contextlib import closing
import bb
@ -976,7 +978,7 @@ class CookerExit(bb.event.Event):
def __init__(self):
bb.event.Event.__init__(self)
class CookerParser:
class CookerParser(object):
def __init__(self, cooker, filelist, masked):
# Internal data
self.filelist = filelist
@ -987,49 +989,106 @@ class CookerParser:
self.cached = 0
self.error = 0
self.masked = masked
self.total = len(filelist)
self.skipped = 0
self.virtuals = 0
self.total = len(filelist)
# Pointer to the next file to parse
self.pointer = 0
# current to the next file to parse
self.current = 0
self.result_queue = None
self.fromcache = None
self.launch_processes()
def launch_processes(self):
self.task_queue = multiprocessing.Queue()
self.result_queue = multiprocessing.Queue()
self.fromcache = []
cfgdata = self.cooker.configuration.data
for filename in self.filelist:
appends = self.cooker.get_file_appends(filename)
if not self.cooker.bb_cache.cacheValid(filename):
self.task_queue.put((filename, appends))
else:
self.fromcache.append((filename, appends))
def worker(input, output, cfgdata):
signal.signal(signal.SIGINT, signal.SIG_IGN)
for filename, appends in iter(input.get, 'STOP'):
infos = bb.cache.Cache.parse(filename, appends, cfgdata)
output.put(infos)
self.processes = []
num_processes = int(cfgdata.getVar("BB_NUMBER_PARSE_THREADS", True) or
multiprocessing.cpu_count())
for i in xrange(num_processes):
process = multiprocessing.Process(target=worker,
args=(self.task_queue,
self.result_queue,
cfgdata))
process.start()
self.processes.append(process)
def shutdown(self, clean=True):
self.result_queue.close()
for process in self.processes:
if clean:
self.task_queue.put('STOP')
else:
process.terminate()
self.task_queue.close()
for process in self.processes:
process.join()
self.cooker.bb_cache.sync()
bb.codeparser.parser_cache_save(self.cooker.configuration.data)
if self.error > 0:
raise ParsingErrorsFound()
def progress(self):
bb.event.fire(bb.event.ParseProgress(self.cached, self.parsed,
self.skipped, self.masked,
self.virtuals, self.error,
self.total),
self.cooker.configuration.event_data)
def parse_next(self):
cooker = self.cooker
if self.pointer < len(self.filelist):
f = self.filelist[self.pointer]
try:
fromCache, skipped, virtuals = cooker.bb_cache.loadData(f, cooker.get_file_appends(f), cooker.configuration.data, cooker.status)
if fromCache:
self.cached += 1
else:
self.parsed += 1
self.skipped += skipped
self.virtuals += virtuals
except KeyboardInterrupt:
cooker.bb_cache.remove(f)
cooker.bb_cache.sync()
raise
except Exception as e:
self.error += 1
cooker.bb_cache.remove(f)
parselog.exception("Unable to open %s", f)
except:
cooker.bb_cache.remove(f)
raise
finally:
bb.event.fire(bb.event.ParseProgress(self.cached, self.parsed, self.skipped, self.masked, self.virtuals, self.error, self.total), cooker.configuration.event_data)
self.pointer += 1
if self.pointer >= self.total:
cooker.bb_cache.sync()
bb.codeparser.parser_cache_save(cooker.configuration.data)
if self.error > 0:
raise ParsingErrorsFound
if self.current >= self.total:
self.shutdown()
return False
try:
if self.result_queue.empty() and self.fromcache:
filename, appends = self.fromcache.pop()
_, infos = cooker.bb_cache.load(filename, appends,
self.cooker.configuration.data)
parsed = False
else:
infos = self.result_queue.get()
parsed = True
except KeyboardInterrupt:
self.shutdown(clean=False)
raise
except Exception as e:
self.error += 1
parselog.critical(str(e))
else:
if parsed:
self.parsed += 1
else:
self.cached += 1
self.virtuals += len(infos)
for virtualfn, info in infos:
cooker.bb_cache.add_info(virtualfn, info, cooker.status,
parsed=parsed)
if info.skipped:
self.skipped += 1
finally:
self.progress()
self.current += 1
return True

View File

@ -648,6 +648,7 @@ def p_error(p):
try:
import pyshtables
except ImportError:
import os
outputdir = os.path.dirname(__file__)
if not os.access(outputdir, os.W_OK):
outputdir = ''