bitbake/runqueue.py: Sync with changes in upstream bitbake

Signed-off-by: Richard Purdie <richard.purdie@linuxfoundation.org>
This commit is contained in:
Richard Purdie 2011-01-10 12:48:49 +00:00
parent 5cc720aac2
commit 9336ba1fd2
1 changed files with 80 additions and 90 deletions

View File

@ -22,13 +22,13 @@ Handles preparation and execution of a queue of tasks
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
import copy
import os
import sys
import subprocess
import signal
import stat
import fcntl
import copy
import logging
import bb
from bb import msg, data, event
@ -36,12 +36,6 @@ from bb import msg, data, event
bblogger = logging.getLogger("BitBake")
logger = logging.getLogger("BitBake.RunQueue")
try:
import cPickle as pickle
except ImportError:
import pickle
logger.info("Importing cPickle failed. Falling back to a very slow implementation.")
class RunQueueStats:
"""
Holds statistics on the tasks handled by the associated runQueue
@ -93,28 +87,28 @@ class RunQueueScheduler(object):
"""
self.rq = runqueue
self.rqdata = rqdata
numTasks = len(self.rq.runq_fnid)
numTasks = len(self.rqdata.runq_fnid)
self.prio_map = []
self.prio_map.extend(range(numTasks))
def next_buildable_tasks(self):
def next_buildable_task(self):
"""
Return the id of the first task we find that is buildable
"""
for tasknum in range(len(self.rqdata.runq_fnid)):
for tasknum in xrange(len(self.rqdata.runq_fnid)):
taskid = self.prio_map[tasknum]
if self.rq.runq_running[taskid] == 1:
continue
if self.rq.runq_buildable[taskid] == 1:
yield taskid
return taskid
def next(self):
"""
Return the id of the task we should build next
"""
if self.rq.stats.active < self.rq.number_tasks:
return next(self.next_buildable_tasks(), None)
return self.next_buildable_task()
class RunQueueSchedulerSpeed(RunQueueScheduler):
"""
@ -127,13 +121,12 @@ class RunQueueSchedulerSpeed(RunQueueScheduler):
"""
The priority map is sorted by task weight.
"""
from copy import deepcopy
self.rq = runqueue
self.rqdata = rqdata
sortweight = sorted(deepcopy(self.rqdata.runq_weight))
copyweight = deepcopy(self.rqdata.runq_weight)
sortweight = sorted(copy.deepcopy(self.rqdata.runq_weight))
copyweight = copy.deepcopy(self.rqdata.runq_weight)
self.prio_map = []
for weight in sortweight:
@ -155,12 +148,11 @@ class RunQueueSchedulerCompletion(RunQueueSchedulerSpeed):
def __init__(self, runqueue, rqdata):
RunQueueSchedulerSpeed.__init__(self, runqueue, rqdata)
from copy import deepcopy
#FIXME - whilst this groups all fnids together it does not reorder the
#fnid groups optimally.
basemap = deepcopy(self.prio_map)
basemap = copy.deepcopy(self.prio_map)
self.prio_map = []
while (len(basemap) > 0):
entry = basemap.pop(0)
@ -190,25 +182,6 @@ class RunQueueData:
self.stampwhitelist = bb.data.getVar("BB_STAMP_WHITELIST", cfgData, 1) or ""
self.multi_provider_whitelist = (bb.data.getVar("MULTI_PROVIDER_WHITELIST", cfgData, 1) or "").split()
self.schedulers = set(obj for obj in globals().itervalues()
if type(obj) is type and issubclass(obj, RunQueueScheduler))
user_schedulers = bb.data.getVar("BB_SCHEDULERS", cfgData, True)
if user_schedulers:
for sched in user_schedulers.split():
if not "." in sched:
bb.note("Ignoring scheduler '%s' from BB_SCHEDULERS: not an import" % sched)
continue
modname, name = sched.rsplit(".", 1)
try:
module = __import__(modname, fromlist=(name,))
except ImportError, exc:
logger.critical("Unable to import scheduler '%s' from '%s': %s" % (name, modname, exc))
raise SystemExit(1)
else:
self.schedulers.add(getattr(module, name))
self.reset()
def reset(self):
@ -313,7 +286,7 @@ class RunQueueData:
if dep in explored_deps[revdep]:
scan = True
if scan:
find_chains(revdep, deepcopy(prev_chain))
find_chains(revdep, copy.deepcopy(prev_chain))
for dep in explored_deps[revdep]:
if dep not in total_deps:
total_deps.append(dep)
@ -715,20 +688,15 @@ class RunQueueData:
stampfnwhitelist.append(fn)
self.stampfnwhitelist = stampfnwhitelist
#self.dump_data(taskData)
# Interate over the task list looking for tasks with a 'setscene' function
self.runq_setscene = []
for task in range(len(self.runq_fnid)):
setscene = taskData.gettask_id(self.taskData.fn_index[self.runq_fnid[task]], self.runq_task[task] + "_setscene", False)
if not setscene:
continue
#bb.note("Found setscene for %s %s" % (self.taskData.fn_index[self.runq_fnid[task]], self.runq_task[task]))
self.runq_setscene.append(task)
# Interate over the task list and call into the siggen code
dealtwith = set()
todeal = set(range(len(self.runq_fnid)))
while len(todeal) > 0:
@ -744,7 +712,7 @@ class RunQueueData:
hashdata = {}
hashdata["hashes"] = {}
hashdata["deps"] = {}
for task in range(len(self.runq_fnid)):
for task in xrange(len(self.runq_fnid)):
hashdata["hashes"][self.taskData.fn_index[self.runq_fnid[task]] + "." + self.runq_task[task]] = self.runq_hash[task]
deps = []
for dep in self.runq_depends[task]:
@ -764,24 +732,24 @@ class RunQueueData:
Dump some debug information on the internal data structures
"""
logger.debug(3, "run_tasks:")
for task in range(len(self.rqdata.runq_task)):
logger.debug(3, " (%s)%s - %s: %s Deps %s RevDeps %s" % (task,
taskQueue.fn_index[self.rqdata.runq_fnid[task]],
self.rqdata.runq_task[task],
self.rqdata.runq_weight[task],
self.rqdata.runq_depends[task],
self.rqdata.runq_revdeps[task]))
for task in xrange(len(self.rqdata.runq_task)):
logger.debug(3, " (%s)%s - %s: %s Deps %s RevDeps %s", task,
taskQueue.fn_index[self.rqdata.runq_fnid[task]],
self.rqdata.runq_task[task],
self.rqdata.runq_weight[task],
self.rqdata.runq_depends[task],
self.rqdata.runq_revdeps[task])
logger.debug(3, "sorted_tasks:")
for task1 in range(len(self.rqdata.runq_task)):
for task1 in xrange(len(self.rqdata.runq_task)):
if task1 in self.prio_map:
task = self.prio_map[task1]
logger.debug(3, " (%s)%s - %s: %s Deps %s RevDeps %s" % (task,
taskQueue.fn_index[self.rqdata.runq_fnid[task]],
self.rqdata.runq_task[task],
self.rqdata.runq_weight[task],
self.rqdata.runq_depends[task],
self.rqdata.runq_revdeps[task]))
logger.debug(3, " (%s)%s - %s: %s Deps %s RevDeps %s", task,
taskQueue.fn_index[self.rqdata.runq_fnid[task]],
self.rqdata.runq_task[task],
self.rqdata.runq_weight[task],
self.rqdata.runq_depends[task],
self.rqdata.runq_revdeps[task])
class RunQueue:
def __init__(self, cooker, cfgData, dataCache, taskData, targets):
@ -809,7 +777,7 @@ class RunQueue:
if self.stamppolicy == "whitelist":
stampwhitelist = self.rqdata.stampfnwhitelist
for task in range(len(self.rqdata.runq_fnid)):
for task in xrange(len(self.rqdata.runq_fnid)):
unchecked[task] = ""
if len(self.rqdata.runq_depends[task]) == 0:
buildable.append(task)
@ -824,7 +792,7 @@ class RunQueue:
if revdep in unchecked:
buildable.append(revdep)
for task in range(len(self.rqdata.runq_fnid)):
for task in xrange(len(self.rqdata.runq_fnid)):
if task not in unchecked:
continue
fn = self.rqdata.taskData.fn_index[self.rqdata.runq_fnid[task]]
@ -909,7 +877,7 @@ class RunQueue:
fn = self.rqdata.taskData.fn_index[self.rqdata.runq_fnid[task]]
if taskname is None:
taskname = self.rqdata.runq_task[task]
stampfile = bb.parse.siggen.stampfile(self.rqdata.dataCache.stamp[fn], fn, taskname)
# If the stamp is missing its not current
@ -919,7 +887,7 @@ class RunQueue:
# If its a 'nostamp' task, it's not current
taskdep = self.rqdata.dataCache.task_deps[fn]
if 'nostamp' in taskdep and taskname in taskdep['nostamp']:
logger.debug(2, "%s.%s is nostamp\n" % (fn, taskname))
logger.debug(2, "%s.%s is nostamp\n", fn, taskname)
return False
if taskname != "do_setscene" and taskname.endswith("_setscene"):
@ -939,10 +907,10 @@ class RunQueue:
continue
if fn == fn2 or (fulldeptree and fn2 not in stampwhitelist):
if not t2:
logger.debug(2, "Stampfile %s does not exist" % (stampfile2))
logger.debug(2, 'Stampfile %s does not exist', stampfile2)
iscurrent = False
if t1 < t2:
logger.debug(2, "Stampfile %s < %s" % (stampfile, stampfile2))
logger.debug(2, 'Stampfile %s < %s', stampfile, stampfile2)
iscurrent = False
return iscurrent
@ -1014,7 +982,7 @@ class RunQueue:
bb.note("Reparsing files to collect dependency data")
for task in range(len(self.rqdata.runq_fnid)):
if self.rqdata.runq_fnid[task] not in done:
fn = self.rqdata.taskData.fn_index[self.rqdata.runq_fnid[task]]
fn = self.rqdata.taskData.fn_index[self.rqdata.runq_fnid[task]]
the_data = bb.cache.Cache.loadDataFull(fn, self.cooker.get_file_appends(fn), self.cooker.configuration.data)
done.add(self.rqdata.runq_fnid[task])
@ -1219,14 +1187,38 @@ class RunQueueExecuteTasks(RunQueueExecute):
event.fire(bb.event.StampUpdate(self.rqdata.target_pairs, self.rqdata.dataCache.stamp), self.cfgData)
for scheduler in self.rqdata.schedulers:
schedulers = self.get_schedulers()
for scheduler in schedulers:
if self.scheduler == scheduler.name:
self.sched = scheduler(self, self.rqdata)
logger.debug(1, "Using runqueue scheduler '%s'", scheduler.name)
break
else:
bb.fatal("Invalid scheduler '%s'. Available schedulers: %s" %
(self.scheduler, ", ".join(obj.name for obj in self.rqdata.schedulers)))
bb.fatal("Invalid scheduler '%s'. Available schedulers: %s" %
(self.scheduler, ", ".join(obj.name for obj in schedulers)))
def get_schedulers(self):
schedulers = set(obj for obj in globals().values()
if type(obj) is type and
issubclass(obj, RunQueueScheduler))
user_schedulers = bb.data.getVar("BB_SCHEDULERS", self.cfgData, True)
if user_schedulers:
for sched in user_schedulers.split():
if not "." in sched:
bb.note("Ignoring scheduler '%s' from BB_SCHEDULERS: not an import" % sched)
continue
modname, name = sched.rsplit(".", 1)
try:
module = __import__(modname, fromlist=(name,))
except ImportError, exc:
logger.critical("Unable to import scheduler '%s' from '%s': %s" % (name, modname, exc))
raise SystemExit(1)
else:
schedulers.add(getattr(module, name))
return schedulers
def task_completeoutright(self, task):
"""
@ -1283,12 +1275,14 @@ class RunQueueExecuteTasks(RunQueueExecute):
# nothing to do
self.rq.state = runQueueCleanUp
for task in iter(self.sched.next, None):
task = self.sched.next()
if task is not None:
fn = self.rqdata.taskData.fn_index[self.rqdata.runq_fnid[task]]
taskname = self.rqdata.runq_task[task]
if self.rq.check_stamp_task(task, taskname):
logger.debug(2, "Stamp current task %s (%s)" % (task, self.rqdata.get_user_idstring(task)))
logger.debug(2, "Stamp current task %s (%s)", task,
self.rqdata.get_user_idstring(task))
self.task_skip(task)
return True
@ -1455,12 +1449,11 @@ class RunQueueExecuteScenequeue(RunQueueExecute):
for task in xrange(len(self.sq_revdeps)):
if task not in valid_new and task not in noexec:
logger.debug(2, "No package found so skipping setscene task %s" % (self.rqdata.get_user_idstring(self.rqdata.runq_setscene[task])))
logger.debug(2, 'No package found, so skipping setscene task %s',
self.rqdata.get_user_idstring(task))
self.task_failoutright(task)
#print(str(valid))
logger.info("Executing SetScene Tasks")
logger.info('Executing SetScene Tasks')
self.rq.state = runQueueSceneRun
@ -1521,11 +1514,6 @@ class RunQueueExecuteScenequeue(RunQueueExecute):
# Find the next setscene to run
for nexttask in xrange(self.stats.total):
if self.runq_buildable[nexttask] == 1 and self.runq_running[nexttask] != 1:
#bb.note("Comparing %s to %s" % (self.sq_revdeps[nexttask], self.scenequeue_covered))
#if len(self.sq_revdeps[nexttask]) > 0 and self.sq_revdeps[nexttask].issubset(self.scenequeue_covered):
# bb.note("Skipping task %s" % nexttask)
# self.scenequeue_skip(nexttask)
# return True
task = nexttask
break
if task is not None:
@ -1534,7 +1522,8 @@ class RunQueueExecuteScenequeue(RunQueueExecute):
taskname = self.rqdata.runq_task[realtask] + "_setscene"
if self.rq.check_stamp_task(realtask, self.rqdata.runq_task[realtask]):
logger.debug(2, "Stamp for underlying task %s (%s) is current so skipping setscene varient" % (task, self.rqdata.get_user_idstring(task)))
logger.debug(2, 'Stamp for underlying task %s(%s) is current, so skipping setscene variant',
task, self.rqdata.get_user_idstring(task))
self.task_failoutright(task)
return True
@ -1545,7 +1534,8 @@ class RunQueueExecuteScenequeue(RunQueueExecute):
return True
if self.rq.check_stamp_task(realtask, taskname):
logger.debug(2, "Setscene stamp current task %s (%s) so skip it and its dependencies" % (task, self.rqdata.get_user_idstring(realtask)))
logger.debug(2, 'Setscene stamp current task %s(%s), so skip it and its dependencies',
task, self.rqdata.get_user_idstring(realtask))
self.task_skip(task)
return True
@ -1575,7 +1565,7 @@ class RunQueueExecuteScenequeue(RunQueueExecute):
for task in oldcovered:
self.rq.scenequeue_covered.add(self.rqdata.runq_setscene[task])
bb.debug(1, "We can skip tasks %s" % self.rq.scenequeue_covered)
logger.debug(1, 'We can skip tasks %s', self.rq.scenequeue_covered)
self.rq.state = runQueueRunInit
return True
@ -1630,12 +1620,12 @@ class runQueueTaskCompleted(runQueueEvent):
"""
#def check_stamp_fn(fn, taskname, d):
# rq = bb.data.getVar("__RUNQUEUE_DO_NOT_USE_EXTERNALLY", d)
# rqexe = bb.data.getVar("__RUNQUEUE_DO_NOT_USE_EXTERNALLY", d)
# fn = bb.data.getVar("__RUNQUEUE_DO_NOT_USE_EXTERNALLY2", d)
# fnid = rq.rqdata.taskData.getfn_id(fn)
# taskid = rq.get_task_id(fnid, taskname)
# fnid = rqexe.rqdata.taskData.getfn_id(fn)
# taskid = rqexe.rqdata.get_task_id(fnid, taskname)
# if taskid is not None:
# return rq.check_stamp_task(taskid)
# return rqexe.rq.check_stamp_task(taskid)
# return None
class runQueuePipe():
@ -1643,17 +1633,17 @@ class runQueuePipe():
Abstraction for a pipe between a worker thread and the server
"""
def __init__(self, pipein, pipeout, d):
self.fd = pipein
self.input = pipein
pipeout.close()
fcntl.fcntl(self.fd, fcntl.F_SETFL, fcntl.fcntl(self.fd, fcntl.F_GETFL) | os.O_NONBLOCK)
fcntl.fcntl(self.input, fcntl.F_SETFL, fcntl.fcntl(self.input, fcntl.F_GETFL) | os.O_NONBLOCK)
self.queue = ""
self.d = d
def read(self):
start = len(self.queue)
try:
self.queue = self.queue + self.fd.read(1024)
except IOError:
self.queue = self.queue + self.input.read(1024)
except (OSError, IOError):
pass
end = len(self.queue)
index = self.queue.find("</event>")
@ -1668,4 +1658,4 @@ class runQueuePipe():
continue
if len(self.queue) > 0:
print("Warning, worker left partial message: %s" % self.queue)
self.fd.close()
self.input.close()