diff --git a/bitbake/bin/bitbake-worker b/bitbake/bin/bitbake-worker index af66ff05c8..db3c4b184f 100755 --- a/bitbake/bin/bitbake-worker +++ b/bitbake/bin/bitbake-worker @@ -12,7 +12,9 @@ import errno import signal import pickle import traceback +import queue from multiprocessing import Lock +from threading import Thread if sys.getfilesystemencoding() != "utf-8": sys.exit("Please use a locale setting which supports utf-8.\nPython can't change the filesystem locale after loading so we need a utf-8 when python starts or things won't work.") @@ -64,7 +66,7 @@ if 0: consolelog.setFormatter(conlogformat) logger.addHandler(consolelog) -worker_queue = b"" +worker_queue = queue.Queue() def worker_fire(event, d): data = b"" + pickle.dumps(event) + b"" @@ -73,21 +75,38 @@ def worker_fire(event, d): def worker_fire_prepickled(event): global worker_queue - worker_queue = worker_queue + event - worker_flush() + worker_queue.put(event) -def worker_flush(): - global worker_queue, worker_pipe +# +# We can end up with write contention with the cooker, it can be trying to send commands +# and we can be trying to send event data back. Therefore use a separate thread for writing +# back data to cooker. +# +worker_thread_exit = False - if not worker_queue: - return +def worker_flush(worker_queue): + worker_queue_int = b"" + global worker_pipe, worker_thread_exit - try: - written = os.write(worker_pipe, worker_queue) - worker_queue = worker_queue[written:] - except (IOError, OSError) as e: - if e.errno != errno.EAGAIN and e.errno != errno.EPIPE: - raise + while True: + try: + worker_queue_int = worker_queue_int + worker_queue.get(True, 1) + except queue.Empty: + pass + while (worker_queue_int or not worker_queue.empty()): + try: + if not worker_queue.empty(): + worker_queue_int = worker_queue_int + worker_queue.get() + written = os.write(worker_pipe, worker_queue_int) + worker_queue_int = worker_queue_int[written:] + except (IOError, OSError) as e: + if e.errno != errno.EAGAIN and e.errno != errno.EPIPE: + raise + if worker_thread_exit and worker_queue.empty() and not worker_queue_int: + return + +worker_thread = Thread(target=worker_flush, args=(worker_queue,)) +worker_thread.start() def worker_child_fire(event, d): global worker_pipe @@ -353,7 +372,6 @@ class BitbakeWorker(object): self.build_pipes[pipe].read() if len(self.build_pids): self.process_waitpid() - worker_flush() def handle_item(self, item, func): @@ -458,8 +476,10 @@ except BaseException as e: import traceback sys.stderr.write(traceback.format_exc()) sys.stderr.write(str(e)) -while len(worker_queue): - worker_flush() + +worker_thread_exit = True +worker_thread.join() + workerlog_write("exitting") sys.exit(0)