Вот код, пытался через join(), но эффекта 0.
from threading import Thread, currentThread
import time
try:
from Queue import Queue, Empty
except ImportError:
from queue import Queue, Empty
import logging
from grab.util.py3k_support import *
STOP = object()
class Worker(Thread):
def __init__(self, callback, taskq, resultq, ignore_exceptions, *args, **kwargs):
self.callback = callback
self.taskq = taskq
self.resultq = resultq
self.ignore_exceptions = ignore_exceptions
Thread.__init__(self, *args, **kwargs)
def run(self):
while True:
task = self.taskq.get()
if task is STOP:
return
else:
try:
self.resultq.put(self.callback(task))
except Exception as ex:
if self.ignore_exceptions:
logging.error('', exc_info=ex)
else:
raise
def make_work(callback, tasks, limit, ignore_exceptions=True,
taskq_size=50):
"""
Run up to "limit" threads, do tasks and yield results.
:param callback: the function that will process single task
:param tasks: the sequence or iterator or queue of tasks, each task
in turn is sequence of arguments, if task is just signle argument
it should be wrapped into list or tuple
:param limit: the maximum number of threads
"""
# If tasks is number convert it to the list of number
if isinstance(tasks, int):
tasks = xrange(tasks)
# Ensure that tasks sequence is iterator
tasks = iter(tasks)
taskq= Queue(taskq_size)
# Here results of task processing will be saved
resultq= Queue()
# Prepare and run up to "limit" threads
threads = []
for x in xrange(limit):
thread = Worker(callback, taskq, resultq, ignore_exceptions)
thread.daemon = True
thread.start()
threads.append(thread)
# Put tasks from tasks iterator to taskq queue
# until tasks iterator ends
# Do it in separate thread
def task_processor(task_iter, task_queue, limit):
try:
try:
for task in task_iter:
task_queue.put(task)
finally:
for x in xrange(limit):
task_queue.put(STOP)
processor = Thread(target=task_processor, args=[tasks, taskq, limit])
processor.daemon = True
processor.start()
while True:
try:
yield resultq.get(True, 0.2)
except Empty:
pass
if not any(x.isAlive() for x in threads):
break
while True:
try:
yield resultq.get(False)
except Empty:
break