Форум сайта python.su
SilverstormКак насчет того, чтобы иметь задачи только одного уровня? Задача будет состоять в полной обработке одного фида. Это сильно упростит обработку, уберет необходимость передавать большие объекты между задачами и по завершению задачи вы будете точно знать, что один фид полностью обработан.
10.000 документов а так как они обрабатываются по части lxml то памяти процесс может сожрать прилично (даже если считать что один документ это 50кб html данных то, если не использовать приоритетности обработки документов в конце у нас будет одних данных в памяти на 500мб) поэтому я решил ввести приоритетность - как только обработка документа закончена - мы можем смело его сбросить в бд и освободить память.
Офлайн
Ed
Спасибо за комментарии.
1. Да действительно к __main__ надо привыкать, это все отголоски пхп
2. Никогда о нем не слышал, ушел читать :)
3. Ну это же вроде указание куда скидывать логи для стандартного logging
4. Судя по документации, число как первый эл кортежа требуется для queue.PriorityQueue(), это значение приоритета задачи, чем ниже приоритет тем важнее.
5. Согласен :) Но ведь я, с самого начала не знал с какой стороны подойти к задаче.
6. Это мой фетишь - обычно я пишу процедурный мокап, а потом причесываю его в класс/пакет в задачах которые я не знаю как сделать сходу.
7. Нет даже не писал, ввиду пункта 6. писать докстринги для когда который совсем не финальная версия и даже непонятно попадет ли туда мне почему-то кажется безсмысленным :/
EdИдея думаю по крайней мере спорная, смотрите, т.к. будет канал использоватся далеко неравномерно - в одном фиде будет например на 1100 соединений/загрузок (100 постов и 10 картинок в каждом) и это будет прекрасно - весь пул будет задействован, а в других фидах будет на 100-200 соединений/загрузок задачь и сервер будет простаивать, мой приоритет использовать канал как можно интенсивнее и за счет чего как можно меньше тратить времени на апдейт данных.
Как насчет того, чтобы иметь задачи только одного уровня? Задача будет состоять в полной обработке одного фида. Это сильно упростит обработку, уберет необходимость передавать большие объекты между задачами и по завершению задачи вы будете точно знать, что один фид полностью обработан.
Отредактировано (Дек. 7, 2010 19:21:22)
Офлайн
3. Это я чего-то решил, что это конфигурационный файл. А это просто лог. Сорри.
4. Ага, я это умудрился пропустить.
6. Можно и так. Кстати, неплохо было бы сделать это решение generic, то есть изобразить некий API, которому передается соответствие обработчиков таскам, количество воркеров, а остальное он сам делает. И другой generic API, который позволит таски создавать.
Каким эмпирическим путем можно подобрать правильное количество воркеров в пуле?Путем тестирования на заранее определенном наборе фидов, приближенном к реальности настолько, насколько это возможно.
Остается только ввести перелинковку данных чтобы картинка знала к какому документу она принадлежит, документ знал к какому фиду, а фид к категории.Это решается при создании таска. Фид, создавая таски для документов будет просто передавать себя или свой id как параметр таска.
Таймауты пришлось делать декоратором т.к. повесить таймаут на выполнение одной задачи штатными средствами кажется не представляется возможным. hmmВыглядит страшно :). Моя практика показывает, что когда в питоновом коде нечто выглядит сложно, то это либо спроектировано, либо реализовано неверно. Можете объяснить в чем проблема, которую вы решили таким способом?
Идея думаю по крайней мере спорнаясогласен, убедили. Но в текущей реализации меня смущает одна вещь. pool.join будет ждать пока все воркеры не завершаться, насколько я понимаю. Это не позволит достичь оптимальной загрузки канала. Нужно опускаться ниже, реализовывая создание и запуск Greenlet-ов самостоятельно. Это поможет не ждать, пока весь пул воркеров освободится, а сделать так, чтобы задачи подбирались не пачками, как сейчас, а сразу. Впрочем это можно и отложить на потом, поскольку лучшее - враг хорошего.
Офлайн
Кстати, а в сторону eventlet вы не смотрели? У него есть, например, GreenPool, который работает так, как нужно - создает и запускает задачи сразу, там нет этого мерзкого join-а. Кроме этого масса примеров, design patterns и другие вкусности.
С другой стороны автор gevent описывает почему он начал свой собственный проект вот здесь: http://blog.gevent.org/2010/02/27/why-gevent/ Почитайте, может что полезное найдете.
Офлайн
Я немного переделал dispatch и способ передачи параметров в worker. Теперь все будет работать так, как вы хотите. Оказывается join был не нужен: http://pastebin.com/M24z5RsV
Офлайн
Уже что-то вырисовывается. Это все еще рабочая версия, дальше не вижу смысла писать абстрактный код - пора причесывать модуль, писать рабочий код, выкидывать логгирование и оформлять комментариями.
Решил запостить лишь для того, чтобы не казалось что советы ушли в /dev/null
Спасибо за советы!
EdДа я сделал ресерч по библиотекам в самом начале и все таки выбрал gevent
Кстати, а в сторону eventlet вы не смотрели? У него есть, например, GreenPool, который работает так, как нужно - создает и запускает задачи сразу, там нет этого мерзкого join-а. Кроме этого масса примеров, design patterns и другие вкусности.
С другой стороны автор gevent описывает почему он начал свой собственный проект вот здесь: http://blog.gevent.org/2010/02/27/why-gevent/ Почитайте, может что полезное найдете.
import gevent
import logging as log
import time
import uuid
from random import randint
from gevent import queue, pool, monkey, sleep
from functools import wraps
def with_timeout(timeout, exception=False):
'''Timeout decorator
Maybe usefull for functions that are unable to watch it's timeout
by themselves.
``timeout`` - seconds
``exception``- if set to None, gevent.Timeout will be raisen.
False value will suppress any exception and other values will be
raised as exception.
Usage:
@with_timeout(5) # 5 seconds timeout set
def foo_handler(task, queue, bar):
pass
'''
def factory(fn):
@wraps(fn)
def decorator(*args):
with gevent.Timeout(timeout, exception):
return fn(*args)
return decorator
return factory
class ProcessingError(Exception):
pass
class HandlerError(ProcessingError):
pass
class HandlerTimeout(HandlerError):
pass
class BaseProcessing(object):
'''Abstract processing class designed for conrete tasks support
You can subclass this class, build your own logic and then simply
call to instance.dispatch(tasks) in order to process your tasks asynchronously
'''
def __init__(self, max_workers=1, handlers=None, log_file='runtime.txt', \
apply_patch=True):
'''Setup processor:
``handlers`` - dictionary of custom task handlers, this handlers
has higher priority than thoose defined in class object itself.
Key of dictionary element maps to task name
``max_workers`` - how much asynchronicity you need?
``log_file`` - enable process log, usefull for debug, None value
disables logging functionality
'''
self.handlers = handlers
if handlers is None:
self.handlers = {}
self.max_workers = max_workers
self.pool = pool.Pool(self.max_workers)
self.queue = queue.PriorityQueue()
self.log = None
if log_file is not None:
self.log = log
self.log.basicConfig(filename=log_file, level=log.DEBUG)
if apply_patch:
monkey.patch_all()
self.start_time = 0
self.end_time = 0
self.finished = False
def debug(self, message):
'''Saves message to logfile if enabled'''
if self.log:
self.log.debug(message)
def get_stats(self):
'''Current status of jobs'''
if self.finished:
return "Jobs done in {:.3}s".format(self.end_time - self.start_time)
return "Jobs are not finished yet"
def dispatch(self, tasks):
'''Run defined asynchronous tasks in an endless loop
``tasks`` iterator object of tasks.
Single task typically looks like (priority, task, ...)
'''
self.start_time = time.time()
for _task in tasks:
self.queue.put(_task)
while True:
while self.pool.free_count():
try:
task = self.queue.get_nowait()
except queue.Empty:
break
# self.debug("Got taks from queue: {}".format(task[:3]))
handler = getattr(self, task[1], None)
if task[1] in self.handlers:
handler = self.handlers[task[1]]
if handler is None:
raise HandlerError, \
"Required handler for task {} not defined".format(task[1])
# pass pool to task, could be used for adding tasks dynamically
self.pool.spawn(handler, self.pool, *(task[2:]))
# self.debug("Started task: {}".format(task[1]))
if self.pool.free_count() == self.max_workers:
self.end_time = time.time()
self.finished = True
break
sleep(1)
class RssProcessing(BaseProcessing):
def __init__(self, **kwargs):
super(RssProcessing, self).__init__(**kwargs)
@with_timeout(1.5)
def image_handler(self, pool, *task):
self.debug('image_handler on {} started'.format(task[0]))
sleep(randint(0, 5))
self.debug('image_handler on {} completed'.format(task[0]))
@with_timeout(3)
def document_handler(self, pool, *task):
self.debug('document_handler on {} started'.format(task[0]))
for i in range(randint(0, 50)):
self.queue.put((0, 'image_handler', uuid.uuid4()))
sleep(randint(0, 4))
self.debug('document_handler {} completed'.format(task[0]))
@with_timeout(3.5)
def feed_handler(self, pool, *task):
self.debug('feed_handler on {} started'.format(task[0]))
for i in range(randint(1, 10)):
self.queue.put((1, 'document_handler', uuid.uuid4()))
sleep(randint(0, 3))
self.debug('feed_handler {} completed'.format(task[0]))
if __name__ == '__main__':
# def foo(pool, *task):
# sleep(randint(1,3))
# processing.debug("Task {} finished".format(task[0]))
# print task, pool
tasks = [(2, 'feed_handler', uuid.uuid4()) for i in xrange(100)]
# handlers = {
# 'foo': foo,
# }
processing = RssProcessing(max_workers=500)
processing.dispatch(tasks)
print processing.get_stats()
Офлайн
Дальнейшие советы и критика еще нужны? Глядя на ваш код у меня есть чего сказать :)
Офлайн
Ed
Я только за :) но некоторые моменты стоит опустить:
1. Отсутствие тестов, к мокапу их писать имхо безсмысленно
2. Отсутствие комментариев - опять же к более реальной реализации они появятся.
3. Логгирование - сделано криво заведомо, думаю в продакшн версии логгирование вообще не пригодится, проверять нужно тестами а не чтением логов.
4. Не все эксепшены используются, опять же это связано с тестовой реализацией - как толькоо хендлеры будут реальными то и использование появится
5. Декоратор with_timeout не идеален, но без него придется в каждом хендлере использовать gevent.Timeout наглядно, это немного DRY но с другой стороны более гибкое апи получается.
6. __name__ = __main__ не поддерживает работы из консоли, опять же тесты будут сделаны в виде юнит тестов а не засоряния пакета лишним кодом.
7. Мне не нравится апи к хендлерам.
Создавая гринлет так self.pool.spawn(handler, self.pool, *(task))
я задаю такое жесткое апи хендлерам: image_handler(self, pool, *task):
Как это лучше поправить в логову не приходит.
Сама же реализация мне нравится - у нас есть абстрактный класс исполнения задач в асинхронном режиме, для конкретных задач нужно лишь написать наследника, либо можно передать словать хендлеров тасков в абстрактный класс и запустить задачи. Т.к в хендлере есть доступ в пул, то добавлять задачи может как хендлер встроенный (метод наследника базового класса) так и внешняя функция. Приоритетность тоже имхо хорошее решение - либо метод будет найден в классе, либо в списке хендлеров.
С удовольствием выслушаю комментарии.
Офлайн
Да конечно получается апи завязанное на gevent но преждевременно писать более абстрактную реализацию наверное нет смысла, я ведь решаю конкретную задачу с использованием конкретных инструментов.
Офлайн
ОК, принимая во внимание ваши ограничения моя критика и пожелания такая:
1. Имея три разных исключения вы создаете проблемы на верхнем уровне. Их же нужно будет ловить потом. Проще бросать одно исключение с информативными сообщениями.
2. Не уверен, что это именно имелось в виду под кривым логированием, но на всякий случай напишу: настройка логирования в классе BaseProcessing не очень красиво выглядит. Лучше конфигурить его отдельно. Вот, почитайте здесь как это можно сделать: http://www.python.su/forum/viewtopic.php?pid=59190#p59190
3. Я ничего не имею против декоратора with_timeout как такового. Мне не нравится реализация с тремя уровнями замыканий. Декораторы с аргументами лучше делать через классы, по-моему. Вот пример: http://www.artima.com/weblogs/viewpost.jsp?thread=240845
4. Можно пронаследовать BaseProcessing от Pool, по-моему так код будет чище. Это типа такой пул с функцией диспетчеризации. Можно и обозвать его соответственно.
5. Какой смысл в handlers = None? Кто процессить-то будет, если у нас такие дефолты?
6. Насчет жесткого вызова spawn я не понял. По-моему тут наоборот любой набор параметров можно использовать. Первые два в задаче - приоритет и название, они пропускаются. Остальные - любые.
Офлайн