bitbake: Initial scenequeue implementation (needs major fixes)

bitbake: scenequeue: Skip setscene if the underlying task already ran
bitbake/setscene: Make sure uneeded dependencies are removed recursively

Signed-off-by: Richard Purdie <rpurdie@linux.intel.com>
This commit is contained in:
Richard Purdie 2010-08-19 11:36:29 +01:00
parent 41a364f59f
commit 787c1cf811
1 changed files with 262 additions and 24 deletions

View File

@ -27,6 +27,7 @@ from bb import msg, data, event
import signal
import stat
import fcntl
import copy
class RunQueueStats:
"""
@ -57,12 +58,14 @@ class RunQueueStats:
# These values indicate the next step due to be run in the
# runQueue state machine
runQueuePrepare = 2
runQueueRunInit = 3
runQueueRunning = 4
runQueueFailed = 6
runQueueCleanUp = 7
runQueueComplete = 8
runQueueChildProcess = 9
runQueueSceneInit = 3
runQueueSceneRun = 4
runQueueRunInit = 5
runQueueRunning = 6
runQueueFailed = 7
runQueueCleanUp = 8
runQueueComplete = 9
runQueueChildProcess = 10
class RunQueueScheduler(object):
"""
@ -672,6 +675,16 @@ class RunQueueData:
#self.dump_data(taskData)
# Interate over the task list looking for tasks with a 'setscene' function
self.runq_setscene = []
for task in range(len(self.runq_fnid)):
setscene = taskData.gettask_id(self.taskData.fn_index[self.runq_fnid[task]], self.runq_task[task] + "_setscene", False)
if not setscene:
continue
bb.note("Found setscene for %s %s" % (self.taskData.fn_index[self.runq_fnid[task]], self.runq_task[task]))
self.runq_setscene.append(task)
def dump_data(self, taskQueue):
"""
Dump some debug information on the internal data structures
@ -802,6 +815,13 @@ class RunQueue:
return current
def check_stamp_task(self, task, taskname = None):
def get_timestamp(f):
try:
if not os.access(f, os.F_OK):
return None
return os.stat(f)[stat.ST_MTIME]
except:
return None
if self.stamppolicy == "perfile":
fulldeptree = False
@ -825,23 +845,24 @@ class RunQueue:
bb.msg.debug(2, bb.msg.domain.RunQueue, "%s.%s is nostamp\n" % (fn, taskname))
return False
if taskname.endswith("_setscene"):
return True
iscurrent = True
t1 = os.stat(stampfile)[stat.ST_MTIME]
t1 = get_timestamp(stampfile)
for dep in self.rqdata.runq_depends[task]:
if iscurrent:
fn2 = self.rqdata.taskData.fn_index[self.rqdata.runq_fnid[dep]]
taskname2 = self.rqdata.runq_task[dep]
stampfile2 = "%s.%s" % (self.rqdata.dataCache.stamp[fn2], taskname2)
t2 = get_timestamp(stampfile2)
t3 = get_timestamp(stampfile2 + "_setscene")
if t3 and t3 > t2:
continue
if fn == fn2 or (fulldeptree and fn2 not in stampwhitelist):
try:
t2 = os.stat(stampfile2)[stat.ST_MTIME]
if t1 < t2:
bb.msg.debug(2, bb.msg.domain.RunQueue, "Stampfile %s < %s" % (stampfile, stampfile2))
iscurrent = False
except:
bb.msg.debug(2, bb.msg.domain.RunQueue, "Exception reading %s for %s" % (stampfile2, stampfile))
if not t2 or t1 < t2:
bb.msg.debug(2, bb.msg.domain.RunQueue, "Stampfile %s < %s (or does not exist)" % (stampfile, stampfile2))
iscurrent = False
return iscurrent
def execute_runqueue(self):
@ -855,7 +876,13 @@ class RunQueue:
if self.state is runQueuePrepare:
self.rqdata.prepare()
self.state = runQueueRunInit
self.state = runQueueSceneInit
if self.state is runQueueSceneInit:
self.rqexe = RunQueueExecuteScenequeue(self)
if self.state is runQueueSceneRun:
self.rqexe.execute()
if self.state is runQueueRunInit:
bb.msg.note(1, bb.msg.domain.RunQueue, "Executing runqueue")
@ -948,14 +975,10 @@ class RunQueueExecute:
for pipe in self.build_pipes:
self.build_pipes[pipe].read()
try:
while self.stats.active > 0:
bb.event.fire(runQueueExitWait(self.stats.active), self.cfgData)
if self.runqueue_process_waitpid() is None:
return
except:
self.finish_now()
raise
if self.stats.active > 0:
bb.event.fire(runQueueExitWait(self.stats.active), self.cfgData)
self.runqueue_process_waitpid()
return
if len(self.failed_fnids) != 0:
self.rq.state = runQueueFailed
@ -1034,6 +1057,23 @@ class RunQueueExecuteTasks(RunQueueExecute):
self.runq_buildable.append(1)
else:
self.runq_buildable.append(0)
if len(self.rqdata.runq_revdeps[task]) > 0 and self.rqdata.runq_revdeps[task].issubset(self.rq.scenequeue_covered):
self.rq.scenequeue_covered.add(task)
found = True
while found:
found = False
for task in range(self.stats.total):
if task in self.rq.scenequeue_covered:
continue
if len(self.rqdata.runq_revdeps[task]) > 0 and self.rqdata.runq_revdeps[task].issubset(self.rq.scenequeue_covered):
self.rq.scenequeue_covered.add(task)
found = True
bb.note("Full skip list %s" % self.rq.scenequeue_covered)
for task in self.rq.scenequeue_covered:
self.task_skip(task)
event.fire(bb.event.StampUpdate(self.rqdata.target_pairs, self.rqdata.dataCache.stamp), self.cfgData)
@ -1145,6 +1185,204 @@ class RunQueueExecuteTasks(RunQueueExecute):
self.rq.state = runQueueComplete
return
class RunQueueExecuteScenequeue(RunQueueExecute):
def __init__(self, rq):
RunQueueExecute.__init__(self, rq)
self.scenequeue_covered = set()
self.scenequeue_notcovered = set()
# If we don't have any setscene functions, skip this step
if len(self.rqdata.runq_setscene) == 0:
rq.scenequeue_covered = set()
rq.state = runQueueRunInit
return
self.stats = RunQueueStats(len(self.rqdata.runq_setscene))
endpoints = {}
sq_revdeps = []
sq_revdeps_new = []
sq_revdeps_squash = []
# We need to construct a dependency graph for the setscene functions. Intermediate
# dependencies between the setscene tasks only complicate the code. This code
# therefore aims to collapse the huge runqueue dependency tree into a smaller one
# only containing the setscene functions.
for task in range(self.stats.total):
self.runq_running.append(0)
self.runq_complete.append(0)
self.runq_buildable.append(0)
for task in range(len(self.rqdata.runq_fnid)):
sq_revdeps.append(copy.copy(self.rqdata.runq_revdeps[task]))
sq_revdeps_new.append(set())
if (len(self.rqdata.runq_revdeps[task]) == 0) and task not in self.rqdata.runq_setscene:
endpoints[task] = None
for task in self.rqdata.runq_setscene:
for dep in self.rqdata.runq_depends[task]:
endpoints[dep] = task
def process_endpoints(endpoints):
newendpoints = {}
for point, task in endpoints.items():
tasks = set()
if task:
tasks.add(task)
if sq_revdeps_new[point]:
tasks |= sq_revdeps_new[point]
sq_revdeps_new[point] = set()
for dep in self.rqdata.runq_depends[point]:
if point in sq_revdeps[dep]:
sq_revdeps[dep].remove(point)
if tasks:
sq_revdeps_new[dep] |= tasks
if (len(sq_revdeps[dep]) == 0 or len(sq_revdeps_new[dep]) != 0) and dep not in self.rqdata.runq_setscene:
newendpoints[dep] = task
if len(newendpoints) != 0:
process_endpoints(newendpoints)
process_endpoints(endpoints)
for task in range(len(self.rqdata.runq_fnid)):
if task in self.rqdata.runq_setscene:
deps = set()
for dep in sq_revdeps_new[task]:
deps.add(self.rqdata.runq_setscene.index(dep))
sq_revdeps_squash.append(deps)
elif len(sq_revdeps_new[task]) != 0:
bb.msg.fatal(bb.msg.domain.RunQueue, "Something went badly wrong during scenequeue generation, aborting. Please report this problem.")
#for task in range(len(sq_revdeps_squash)):
# print "Task %s: %s.%s is %s " % (task, self.taskData.fn_index[self.runq_fnid[self.runq_setscene[task]]], self.runq_task[self.runq_setscene[task]] + "_setscene", sq_revdeps_squash[task])
self.sq_deps = []
self.sq_revdeps = sq_revdeps_squash
self.sq_revdeps2 = copy.deepcopy(self.sq_revdeps)
for task in range(len(self.sq_revdeps)):
self.sq_deps.append(set())
for task in range(len(self.sq_revdeps)):
for dep in self.sq_revdeps[task]:
self.sq_deps[dep].add(task)
for task in range(len(self.sq_revdeps)):
if len(self.sq_revdeps[task]) == 0:
self.runq_buildable[task] = 1
bb.msg.note(1, bb.msg.domain.RunQueue, "Executing setscene Tasks")
self.rq.state = runQueueSceneRun
def scenequeue_updatecounters(self, task):
for dep in self.sq_deps[task]:
self.sq_revdeps2[dep].remove(task)
if len(self.sq_revdeps2[dep]) == 0:
self.runq_buildable[dep] = 1
def task_complete(self, task):
"""
Mark a task as completed
Look at the reverse dependencies and mark any task with
completed dependencies as buildable
"""
index = self.rqdata.runq_setscene[task]
bb.msg.note(1, bb.msg.domain.RunQueue, "Found task %s could be accelerated" % self.rqdata.get_user_idstring(index))
self.scenequeue_covered.add(task)
self.scenequeue_updatecounters(task)
def task_fail(self, task, result):
self.stats.taskFailed()
index = self.rqdata.runq_setscene[task]
bb.event.fire(runQueueTaskFailed(task, self.stats, self), self.cfgData)
self.scenequeue_notcovered.add(task)
self.scenequeue_updatecounters(task)
def task_failoutright(self, task):
self.runq_running[task] = 1
self.runq_buildable[task] = 1
self.stats.taskCompleted()
self.stats.taskSkipped()
index = self.rqdata.runq_setscene[task]
self.scenequeue_notcovered.add(task)
self.scenequeue_updatecounters(task)
def task_skip(self, task):
self.runq_running[task] = 1
self.runq_buildable[task] = 1
self.task_complete(task)
self.stats.taskCompleted()
self.stats.taskSkipped()
def execute(self):
"""
Run the tasks in a queue prepared by prepare_runqueue
"""
task = None
if self.stats.active < self.number_tasks:
# Find the next setscene to run
for nexttask in range(self.stats.total):
if self.runq_buildable[nexttask] == 1 and self.runq_running[nexttask] != 1:
#bb.note("Comparing %s to %s" % (self.sq_revdeps[nexttask], self.scenequeue_covered))
#if len(self.sq_revdeps[nexttask]) > 0 and self.sq_revdeps[nexttask].issubset(self.scenequeue_covered):
# bb.note("Skipping task %s" % nexttask)
# self.scenequeue_skip(nexttask)
# return True
task = nexttask
break
if task is not None:
realtask = self.rqdata.runq_setscene[task]
fn = self.rqdata.taskData.fn_index[self.rqdata.runq_fnid[realtask]]
taskname = self.rqdata.runq_task[realtask] + "_setscene"
if self.rq.check_stamp_task(realtask, self.rqdata.runq_task[realtask]):
bb.msg.debug(2, bb.msg.domain.RunQueue, "Stamp for underlying task %s (%s) is current so skipping setscene varient" % (task, self.rqdata.get_user_idstring(task)))
self.task_failoutright(task)
return True
if self.cooker.configuration.force:
for target in self.target_pairs:
if target[0] == fn and target[1] == self.rqdata.runq_task[realtask]:
self.task_failoutright(task)
return True
if self.rq.check_stamp_task(realtask, taskname):
bb.msg.debug(2, bb.msg.domain.RunQueue, "Setscene stamp current task %s (%s) so skip it and its dependencies" % (task, self.rqdata.get_user_idstring(realtask)))
self.task_skip(task)
return True
pid, pipein, pipeout = self.fork_off_task(fn, realtask, taskname)
self.build_pids[pid] = task
self.build_pipes[pid] = runQueuePipe(pipein, pipeout, self.cfgData)
self.runq_running[task] = 1
self.stats.taskActive()
if self.stats.active < self.number_tasks:
return True
for pipe in self.build_pipes:
self.build_pipes[pipe].read()
if self.stats.active > 0:
if self.runqueue_process_waitpid() is None:
return True
return True
# Convert scenequeue_covered task numbers into full taskgraph ids
oldcovered = self.scenequeue_covered
self.rq.scenequeue_covered = set()
for task in oldcovered:
self.rq.scenequeue_covered.add(self.rqdata.runq_setscene[task])
bb.note("We can skip tasks %s" % self.rq.scenequeue_covered)
self.rq.state = runQueueRunInit
return True
class TaskFailure(Exception):