oe/utils.py: Fix thread leakage in ThreadPool

In order to fix Thread leakage caused by not call join() in Threads,

Pass num_tasks in ThreadPool for add all the tasks into a Queue this
enable catch of Queue.Empty exception and exit the threads.

classes/sstate.bbclass: Change checkstatus function to match new
ThreadPool operation.

(From OE-Core rev: 524d92ed7b53bef933527095e82f378b934f25ef)

Signed-off-by: Aníbal Limón <anibal.limon@linux.intel.com>
Signed-off-by: Richard Purdie <richard.purdie@linuxfoundation.org>
This commit is contained in:
Aníbal Limón 2015-06-23 11:49:53 -05:00 committed by Richard Purdie
parent 18e902b2dc
commit 3fa32158c4
2 changed files with 22 additions and 7 deletions

View File

@ -771,9 +771,10 @@ def sstate_checkhashes(sq_fn, sq_task, sq_hash, sq_hashfn, d, siginfo=False):
bb.note("Checking sstate mirror object availability (for %s objects)" % len(tasklist))
import multiprocessing
nproc = min(multiprocessing.cpu_count(), len(tasklist))
pool = oe.utils.ThreadedPool(nproc)
pool = oe.utils.ThreadedPool(nproc, len(tasklist))
for t in tasklist:
pool.add_task(checkstatus, t)
pool.start()
pool.wait_completion()
inheritlist = d.getVar("INHERIT", True)

View File

@ -222,11 +222,16 @@ class ThreadedWorker(Thread):
Thread.__init__(self)
self.tasks = tasks
self.daemon = True
self.start()
def run(self):
from Queue import Empty
while True:
func, args, kargs = self.tasks.get()
try:
func, args, kargs = self.tasks.get(block=False)
except Empty:
break
try:
func(*args, **kargs)
except Exception, e:
@ -236,9 +241,17 @@ class ThreadedWorker(Thread):
class ThreadedPool:
"""Pool of threads consuming tasks from a queue"""
def __init__(self, num_threads):
self.tasks = Queue(num_threads)
for _ in range(num_threads): ThreadedWorker(self.tasks)
def __init__(self, num_workers, num_tasks):
self.tasks = Queue(num_tasks)
self.workers = []
for _ in range(num_workers):
worker = ThreadedWorker(self.tasks)
self.workers.append(worker)
def start(self):
for worker in self.workers:
worker.start()
def add_task(self, func, *args, **kargs):
"""Add a task to the queue"""
@ -247,4 +260,5 @@ class ThreadedPool:
def wait_completion(self):
"""Wait for completion of all the tasks in the queue"""
self.tasks.join()
for worker in self.workers:
worker.join()