Уведомления

Группа в Telegram: @pythonsu

#1 Дек. 18, 2014 16:19:48

kampella
Зарегистрирован: 2014-11-21
Сообщения: 95
Репутация: +  1  -
Профиль   Отправить e-mail  

Таймаут для потоков в grab.tools.work.make_work

Возможно ли установить таймаут для каждого потока в grab.tools.work.make_work?

Офлайн

#2 Дек. 19, 2014 00:09:45

kampella
Зарегистрирован: 2014-11-21
Сообщения: 95
Репутация: +  1  -
Профиль   Отправить e-mail  

Таймаут для потоков в grab.tools.work.make_work

Вот код, пытался через join(), но эффекта 0.

from threading import Thread, currentThread
import time
try:
    from Queue import Queue, Empty
except ImportError:
    from queue import Queue, Empty
import logging
from grab.util.py3k_support import *
STOP = object()
class Worker(Thread):
    def __init__(self, callback, taskq, resultq, ignore_exceptions, *args, **kwargs):
        self.callback = callback
        self.taskq = taskq
        self.resultq = resultq
        self.ignore_exceptions = ignore_exceptions
        Thread.__init__(self, *args, **kwargs)
    def run(self):
        while True:
            task = self.taskq.get()
            if task is STOP:
                return
            else:
                try:
                    self.resultq.put(self.callback(task))
                except Exception as ex:
                    if self.ignore_exceptions:
                        logging.error('', exc_info=ex)
                    else:
                        raise
def make_work(callback, tasks, limit, ignore_exceptions=True,
        taskq_size=50):
    """
    Run up to "limit" threads, do tasks and yield results.
    :param callback:  the function that will process single task
    :param tasks:  the sequence or iterator or queue of tasks, each task
        in turn is sequence of arguments, if task is just signle argument
        it should be wrapped into list or tuple
    :param limit: the maximum number of threads
    """
    # If tasks is number convert it to the list of number
    if isinstance(tasks, int):
        tasks = xrange(tasks)
    # Ensure that tasks sequence is iterator
    tasks = iter(tasks)
    taskq= Queue(taskq_size)
    # Here results of task processing will be saved
    resultq= Queue()
    # Prepare and run up to "limit" threads
    threads = []
    for x in xrange(limit):
        thread = Worker(callback, taskq, resultq, ignore_exceptions)
        thread.daemon = True
        thread.start()
        threads.append(thread)
    # Put tasks from tasks iterator to taskq queue
    # until tasks iterator ends
    # Do it in separate thread
    def task_processor(task_iter, task_queue, limit):
        try:
try:
            for task in task_iter:
                task_queue.put(task)
        finally:
            for x in xrange(limit):
                task_queue.put(STOP)
    processor = Thread(target=task_processor, args=[tasks, taskq, limit])
    processor.daemon = True
    processor.start()
    while True:
        try:
            yield resultq.get(True, 0.2)
        except Empty:
            pass
        if not any(x.isAlive() for x in threads):
            break
    while True:
        try:
            yield resultq.get(False)
        except Empty:
            break

Офлайн

#3 Дек. 26, 2014 06:46:52

plusplus
От:
Зарегистрирован: 2009-01-05
Сообщения: 418
Репутация: +  15  -
Профиль   Отправить e-mail  

Таймаут для потоков в grab.tools.work.make_work

Что подразумевается под таймаутом? Почему нельзя просто в функции, котороую парралеллим, не прописать time.sleep?



Офлайн

Board footer

Модераторировать

Powered by DjangoBB

Lo-Fi Version