Форум сайта python.su
1
Возможно ли установить таймаут для каждого потока в grab.tools.work.make_work?
Офлайн
1
Вот код, пытался через join(), но эффекта 0.
from threading import Thread, currentThread import time try: from Queue import Queue, Empty except ImportError: from queue import Queue, Empty import logging from grab.util.py3k_support import * STOP = object() class Worker(Thread): def __init__(self, callback, taskq, resultq, ignore_exceptions, *args, **kwargs): self.callback = callback self.taskq = taskq self.resultq = resultq self.ignore_exceptions = ignore_exceptions Thread.__init__(self, *args, **kwargs) def run(self): while True: task = self.taskq.get() if task is STOP: return else: try: self.resultq.put(self.callback(task)) except Exception as ex: if self.ignore_exceptions: logging.error('', exc_info=ex) else: raise def make_work(callback, tasks, limit, ignore_exceptions=True, taskq_size=50): """ Run up to "limit" threads, do tasks and yield results. :param callback: the function that will process single task :param tasks: the sequence or iterator or queue of tasks, each task in turn is sequence of arguments, if task is just signle argument it should be wrapped into list or tuple :param limit: the maximum number of threads """ # If tasks is number convert it to the list of number if isinstance(tasks, int): tasks = xrange(tasks) # Ensure that tasks sequence is iterator tasks = iter(tasks) taskq= Queue(taskq_size) # Here results of task processing will be saved resultq= Queue() # Prepare and run up to "limit" threads threads = [] for x in xrange(limit): thread = Worker(callback, taskq, resultq, ignore_exceptions) thread.daemon = True thread.start() threads.append(thread) # Put tasks from tasks iterator to taskq queue # until tasks iterator ends # Do it in separate thread def task_processor(task_iter, task_queue, limit): try: try: for task in task_iter: task_queue.put(task) finally: for x in xrange(limit): task_queue.put(STOP) processor = Thread(target=task_processor, args=[tasks, taskq, limit]) processor.daemon = True processor.start() while True: try: yield resultq.get(True, 0.2) except Empty: pass if not any(x.isAlive() for x in threads): break while True: try: yield resultq.get(False) except Empty: break
Офлайн
15
Что подразумевается под таймаутом? Почему нельзя просто в функции, котороую парралеллим, не прописать time.sleep?
Офлайн