From 0ef16f083eddb0eccd5fd1604e6e922a38705ae5 Mon Sep 17 00:00:00 2001 From: Richard Purdie Date: Mon, 15 Aug 2016 17:58:39 +0100 Subject: [PATCH] bitbake: runqueue: Abstract worker functionality to an object/array With the introduction of multi-config and the possibility of distributed builds we need arrays of workers rather than the existing two. This refactors the code to have a dict() of workers and a dict of fakeworkers, represented by objects. The code can iterate over these. This is separated out from the multi-config changes since its separable and clearer this way. (Bitbake rev: 8181d96e0a4df0aa47287669681116fa65bcae16) Signed-off-by: Richard Purdie --- bitbake/lib/bb/runqueue.py | 122 ++++++++++++++++++++----------------- 1 file changed, 66 insertions(+), 56 deletions(-) diff --git a/bitbake/lib/bb/runqueue.py b/bitbake/lib/bb/runqueue.py index 3a593b6c4b..6a953b844a 100644 --- a/bitbake/lib/bb/runqueue.py +++ b/bitbake/lib/bb/runqueue.py @@ -922,6 +922,11 @@ class RunQueueData: self.runtaskentries[tid].depends, self.runtaskentries[tid].revdeps) +class RunQueueWorker(): + def __init__(self, process, pipe): + self.process = process + self.pipe = pipe + class RunQueue: def __init__(self, cooker, cfgData, dataCache, taskData, targets): @@ -940,10 +945,8 @@ class RunQueue: self.dm = monitordisk.diskMonitor(cfgData) self.rqexe = None - self.worker = None - self.workerpipe = None - self.fakeworker = None - self.fakeworkerpipe = None + self.worker = {} + self.fakeworker = {} def _start_worker(self, fakeroot = False, rqexec = None): logger.debug(1, "Starting bitbake-worker") @@ -988,55 +991,56 @@ class RunQueue: worker.stdin.write(b"" + pickle.dumps(workerdata) + b"") worker.stdin.flush() - return worker, workerpipe + return RunQueueWorker(worker, workerpipe) - def _teardown_worker(self, worker, workerpipe): + def _teardown_worker(self, worker): if not worker: return logger.debug(1, "Teardown for bitbake-worker") try: - worker.stdin.write(b"") - worker.stdin.flush() - worker.stdin.close() + worker.process.stdin.write(b"") + worker.process.stdin.flush() + worker.process.stdin.close() except IOError: pass - while worker.returncode is None: - workerpipe.read() - worker.poll() - while workerpipe.read(): + while worker.process.returncode is None: + worker.pipe.read() + worker.process.poll() + while worker.pipe.read(): continue - workerpipe.close() + worker.pipe.close() def start_worker(self): if self.worker: self.teardown_workers() self.teardown = False - self.worker, self.workerpipe = self._start_worker() + self.worker[''] = self._start_worker() def start_fakeworker(self, rqexec): if not self.fakeworker: - self.fakeworker, self.fakeworkerpipe = self._start_worker(True, rqexec) + self.fakeworker[''] = self._start_worker(True, rqexec) def teardown_workers(self): self.teardown = True - self._teardown_worker(self.worker, self.workerpipe) - self.worker = None - self.workerpipe = None - self._teardown_worker(self.fakeworker, self.fakeworkerpipe) - self.fakeworker = None - self.fakeworkerpipe = None + for mc in self.worker: + self._teardown_worker(self.worker[mc]) + self.worker = {} + for mc in self.fakeworker: + self._teardown_worker(self.fakeworker[mc]) + self.fakeworker = {} def read_workers(self): - self.workerpipe.read() - if self.fakeworkerpipe: - self.fakeworkerpipe.read() + for mc in self.worker: + self.worker[mc].pipe.read() + for mc in self.fakeworker: + self.fakeworker[mc].pipe.read() def active_fds(self): fds = [] - if self.workerpipe: - fds.append(self.workerpipe.input) - if self.fakeworkerpipe: - fds.append(self.fakeworkerpipe.input) + for mc in self.worker: + fds.append(self.worker[mc].pipe.input) + for mc in self.fakeworker: + fds.append(self.fakeworker[mc].pipe.input) return fds def check_stamp_task(self, tid, taskname = None, recurse = False, cache = None): @@ -1393,9 +1397,10 @@ class RunQueueExecute: self.stampcache = {} - rq.workerpipe.setrunqueueexec(self) - if rq.fakeworkerpipe: - rq.fakeworkerpipe.setrunqueueexec(self) + for mc in rq.worker: + rq.worker[mc].pipe.setrunqueueexec(self) + for mc in rq.fakeworker: + rq.fakeworker[mc].pipe.setrunqueueexec(self) if self.number_tasks <= 0: bb.fatal("Invalid BB_NUMBER_THREADS %s" % self.number_tasks) @@ -1414,15 +1419,21 @@ class RunQueueExecute: return True def finish_now(self): - for worker in [self.rq.worker, self.rq.fakeworker]: - if not worker: - continue + for mc in self.rq.worker: try: - worker.stdin.write(b"") - worker.stdin.flush() + self.rq.worker[mc].process.stdin.write(b"") + self.rq.worker[mc].process.stdin.flush() except IOError: # worker must have died? pass + for mc in self.rq.fakeworker: + try: + self.rq.fakeworker[mc].process.stdin.write(b"") + self.rq.fakeworker[mc].process.stdin.flush() + except IOError: + # worker must have died? + pass + if len(self.failed_fns) != 0: self.rq.state = runQueueFailed return @@ -1733,11 +1744,11 @@ class RunQueueExecuteTasks(RunQueueExecute): logger.critical("Failed to spawn fakeroot worker to run %s: %s" % (task, str(exc))) self.rq.state = runQueueFailed return True - self.rq.fakeworker.stdin.write(b"" + pickle.dumps((fn, task, taskname, False, self.cooker.collection.get_file_appends(fn), taskdepdata)) + b"") - self.rq.fakeworker.stdin.flush() + self.rq.fakeworker[''].process.stdin.write(b"" + pickle.dumps((fn, task, taskname, False, self.cooker.collection.get_file_appends(fn), taskdepdata)) + b"") + self.rq.fakeworker[''].process.stdin.flush() else: - self.rq.worker.stdin.write(b"" + pickle.dumps((fn, task, taskname, False, self.cooker.collection.get_file_appends(fn), taskdepdata)) + b"") - self.rq.worker.stdin.flush() + self.rq.worker[''].process.stdin.write(b"" + pickle.dumps((fn, task, taskname, False, self.cooker.collection.get_file_appends(fn), taskdepdata)) + b"") + self.rq.worker[''].process.stdin.flush() self.build_stamps[task] = bb.build.stampfile(taskname, self.rqdata.dataCache, fn) self.build_stamps2.append(self.build_stamps[task]) @@ -2143,11 +2154,11 @@ class RunQueueExecuteScenequeue(RunQueueExecute): if 'fakeroot' in taskdep and taskname in taskdep['fakeroot'] and not self.cooker.configuration.dry_run: if not self.rq.fakeworker: self.rq.start_fakeworker(self) - self.rq.fakeworker.stdin.write(b"" + pickle.dumps((fn, task, taskname, True, self.cooker.collection.get_file_appends(fn), None)) + b"") - self.rq.fakeworker.stdin.flush() + self.rq.fakeworker[''].process.stdin.write(b"" + pickle.dumps((fn, task, taskname, True, self.cooker.collection.get_file_appends(fn), None)) + b"") + self.rq.fakeworker[''].process.stdin.flush() else: - self.rq.worker.stdin.write(b"" + pickle.dumps((fn, task, taskname, True, self.cooker.collection.get_file_appends(fn), None)) + b"") - self.rq.worker.stdin.flush() + self.rq.worker[''].process.stdin.write(b"" + pickle.dumps((fn, task, taskname, True, self.cooker.collection.get_file_appends(fn), None)) + b"") + self.rq.worker[''].process.stdin.flush() self.runq_running.add(task) self.stats.taskActive() @@ -2301,17 +2312,16 @@ class runQueuePipe(): def read(self): for w in [self.rq.worker, self.rq.fakeworker]: - if not w: - continue - w.poll() - if w.returncode is not None and not self.rq.teardown: - name = None - if self.rq.worker and w.pid == self.rq.worker.pid: - name = "Worker" - elif self.rq.fakeworker and w.pid == self.rq.fakeworker.pid: - name = "Fakeroot" - bb.error("%s process (%s) exited unexpectedly (%s), shutting down..." % (name, w.pid, str(w.returncode))) - self.rq.finish_runqueue(True) + for mc in w: + w[mc].process.poll() + if w[mc].process.returncode is not None and not self.rq.teardown: + name = None + if w in self.rq.worker: + name = "Worker" + elif w in self.rq.fakeworker: + name = "Fakeroot" + bb.error("%s process (%s) exited unexpectedly (%s), shutting down..." % (name, w.pid, str(w.returncode))) + self.rq.finish_runqueue(True) start = len(self.queue) try: