bitbake: runqueue: Abstract worker functionality to an object/array

With the introduction of multi-config and the possibility of distributed
builds we need arrays of workers rather than the existing two.

This refactors the code to have a dict() of workers and a dict of
fakeworkers, represented by objects. The code can iterate over these.

This is separated out from the multi-config changes since its separable
and clearer this way.

(Bitbake rev: 8181d96e0a4df0aa47287669681116fa65bcae16)

Signed-off-by: Richard Purdie <richard.purdie@linuxfoundation.org>
This commit is contained in:
Richard Purdie 2016-08-15 17:58:39 +01:00
parent 249686927b
commit 0ef16f083e
1 changed files with 66 additions and 56 deletions

View File

@ -922,6 +922,11 @@ class RunQueueData:
self.runtaskentries[tid].depends,
self.runtaskentries[tid].revdeps)
class RunQueueWorker():
def __init__(self, process, pipe):
self.process = process
self.pipe = pipe
class RunQueue:
def __init__(self, cooker, cfgData, dataCache, taskData, targets):
@ -940,10 +945,8 @@ class RunQueue:
self.dm = monitordisk.diskMonitor(cfgData)
self.rqexe = None
self.worker = None
self.workerpipe = None
self.fakeworker = None
self.fakeworkerpipe = None
self.worker = {}
self.fakeworker = {}
def _start_worker(self, fakeroot = False, rqexec = None):
logger.debug(1, "Starting bitbake-worker")
@ -988,55 +991,56 @@ class RunQueue:
worker.stdin.write(b"<workerdata>" + pickle.dumps(workerdata) + b"</workerdata>")
worker.stdin.flush()
return worker, workerpipe
return RunQueueWorker(worker, workerpipe)
def _teardown_worker(self, worker, workerpipe):
def _teardown_worker(self, worker):
if not worker:
return
logger.debug(1, "Teardown for bitbake-worker")
try:
worker.stdin.write(b"<quit></quit>")
worker.stdin.flush()
worker.stdin.close()
worker.process.stdin.write(b"<quit></quit>")
worker.process.stdin.flush()
worker.process.stdin.close()
except IOError:
pass
while worker.returncode is None:
workerpipe.read()
worker.poll()
while workerpipe.read():
while worker.process.returncode is None:
worker.pipe.read()
worker.process.poll()
while worker.pipe.read():
continue
workerpipe.close()
worker.pipe.close()
def start_worker(self):
if self.worker:
self.teardown_workers()
self.teardown = False
self.worker, self.workerpipe = self._start_worker()
self.worker[''] = self._start_worker()
def start_fakeworker(self, rqexec):
if not self.fakeworker:
self.fakeworker, self.fakeworkerpipe = self._start_worker(True, rqexec)
self.fakeworker[''] = self._start_worker(True, rqexec)
def teardown_workers(self):
self.teardown = True
self._teardown_worker(self.worker, self.workerpipe)
self.worker = None
self.workerpipe = None
self._teardown_worker(self.fakeworker, self.fakeworkerpipe)
self.fakeworker = None
self.fakeworkerpipe = None
for mc in self.worker:
self._teardown_worker(self.worker[mc])
self.worker = {}
for mc in self.fakeworker:
self._teardown_worker(self.fakeworker[mc])
self.fakeworker = {}
def read_workers(self):
self.workerpipe.read()
if self.fakeworkerpipe:
self.fakeworkerpipe.read()
for mc in self.worker:
self.worker[mc].pipe.read()
for mc in self.fakeworker:
self.fakeworker[mc].pipe.read()
def active_fds(self):
fds = []
if self.workerpipe:
fds.append(self.workerpipe.input)
if self.fakeworkerpipe:
fds.append(self.fakeworkerpipe.input)
for mc in self.worker:
fds.append(self.worker[mc].pipe.input)
for mc in self.fakeworker:
fds.append(self.fakeworker[mc].pipe.input)
return fds
def check_stamp_task(self, tid, taskname = None, recurse = False, cache = None):
@ -1393,9 +1397,10 @@ class RunQueueExecute:
self.stampcache = {}
rq.workerpipe.setrunqueueexec(self)
if rq.fakeworkerpipe:
rq.fakeworkerpipe.setrunqueueexec(self)
for mc in rq.worker:
rq.worker[mc].pipe.setrunqueueexec(self)
for mc in rq.fakeworker:
rq.fakeworker[mc].pipe.setrunqueueexec(self)
if self.number_tasks <= 0:
bb.fatal("Invalid BB_NUMBER_THREADS %s" % self.number_tasks)
@ -1414,15 +1419,21 @@ class RunQueueExecute:
return True
def finish_now(self):
for worker in [self.rq.worker, self.rq.fakeworker]:
if not worker:
continue
for mc in self.rq.worker:
try:
worker.stdin.write(b"<finishnow></finishnow>")
worker.stdin.flush()
self.rq.worker[mc].process.stdin.write(b"<finishnow></finishnow>")
self.rq.worker[mc].process.stdin.flush()
except IOError:
# worker must have died?
pass
for mc in self.rq.fakeworker:
try:
self.rq.fakeworker[mc].process.stdin.write(b"<finishnow></finishnow>")
self.rq.fakeworker[mc].process.stdin.flush()
except IOError:
# worker must have died?
pass
if len(self.failed_fns) != 0:
self.rq.state = runQueueFailed
return
@ -1733,11 +1744,11 @@ class RunQueueExecuteTasks(RunQueueExecute):
logger.critical("Failed to spawn fakeroot worker to run %s: %s" % (task, str(exc)))
self.rq.state = runQueueFailed
return True
self.rq.fakeworker.stdin.write(b"<runtask>" + pickle.dumps((fn, task, taskname, False, self.cooker.collection.get_file_appends(fn), taskdepdata)) + b"</runtask>")
self.rq.fakeworker.stdin.flush()
self.rq.fakeworker[''].process.stdin.write(b"<runtask>" + pickle.dumps((fn, task, taskname, False, self.cooker.collection.get_file_appends(fn), taskdepdata)) + b"</runtask>")
self.rq.fakeworker[''].process.stdin.flush()
else:
self.rq.worker.stdin.write(b"<runtask>" + pickle.dumps((fn, task, taskname, False, self.cooker.collection.get_file_appends(fn), taskdepdata)) + b"</runtask>")
self.rq.worker.stdin.flush()
self.rq.worker[''].process.stdin.write(b"<runtask>" + pickle.dumps((fn, task, taskname, False, self.cooker.collection.get_file_appends(fn), taskdepdata)) + b"</runtask>")
self.rq.worker[''].process.stdin.flush()
self.build_stamps[task] = bb.build.stampfile(taskname, self.rqdata.dataCache, fn)
self.build_stamps2.append(self.build_stamps[task])
@ -2143,11 +2154,11 @@ class RunQueueExecuteScenequeue(RunQueueExecute):
if 'fakeroot' in taskdep and taskname in taskdep['fakeroot'] and not self.cooker.configuration.dry_run:
if not self.rq.fakeworker:
self.rq.start_fakeworker(self)
self.rq.fakeworker.stdin.write(b"<runtask>" + pickle.dumps((fn, task, taskname, True, self.cooker.collection.get_file_appends(fn), None)) + b"</runtask>")
self.rq.fakeworker.stdin.flush()
self.rq.fakeworker[''].process.stdin.write(b"<runtask>" + pickle.dumps((fn, task, taskname, True, self.cooker.collection.get_file_appends(fn), None)) + b"</runtask>")
self.rq.fakeworker[''].process.stdin.flush()
else:
self.rq.worker.stdin.write(b"<runtask>" + pickle.dumps((fn, task, taskname, True, self.cooker.collection.get_file_appends(fn), None)) + b"</runtask>")
self.rq.worker.stdin.flush()
self.rq.worker[''].process.stdin.write(b"<runtask>" + pickle.dumps((fn, task, taskname, True, self.cooker.collection.get_file_appends(fn), None)) + b"</runtask>")
self.rq.worker[''].process.stdin.flush()
self.runq_running.add(task)
self.stats.taskActive()
@ -2301,17 +2312,16 @@ class runQueuePipe():
def read(self):
for w in [self.rq.worker, self.rq.fakeworker]:
if not w:
continue
w.poll()
if w.returncode is not None and not self.rq.teardown:
name = None
if self.rq.worker and w.pid == self.rq.worker.pid:
name = "Worker"
elif self.rq.fakeworker and w.pid == self.rq.fakeworker.pid:
name = "Fakeroot"
bb.error("%s process (%s) exited unexpectedly (%s), shutting down..." % (name, w.pid, str(w.returncode)))
self.rq.finish_runqueue(True)
for mc in w:
w[mc].process.poll()
if w[mc].process.returncode is not None and not self.rq.teardown:
name = None
if w in self.rq.worker:
name = "Worker"
elif w in self.rq.fakeworker:
name = "Fakeroot"
bb.error("%s process (%s) exited unexpectedly (%s), shutting down..." % (name, w.pid, str(w.returncode)))
self.rq.finish_runqueue(True)
start = len(self.queue)
try: