Найти - Пользователи
Полная версия: gevent - Не знаю как решить задачу с парралелизированием запросов
Начало » Network » gevent - Не знаю как решить задачу с парралелизированием запросов
1 2 3 4
Silverstorm
Ed
ОК, принимая во внимание ваши ограничения моя критика и пожелания такая:
1. Имея три разных исключения вы создаете проблемы на верхнем уровне. Их же нужно будет ловить потом. Проще бросать одно исключение с информативными сообщениями.
2. Не уверен, что это именно имелось в виду под кривым логированием, но на всякий случай напишу: настройка логирования в классе BaseProcessing не очень красиво выглядит. Лучше конфигурить его отдельно. Вот, почитайте здесь как это можно сделать: http://www.python.su/forum/viewtopic.ph … 190#p59190
3. Я ничего не имею против декоратора with_timeout как такового. Мне не нравится реализация с тремя уровнями замыканий. Декораторы с аргументами лучше делать через классы, по-моему. Вот пример: http://www.artima.com/weblogs/viewpost. … ead=240845
4. Можно пронаследовать BaseProcessing от Pool, по-моему так код будет чище. Это типа такой пул с функцией диспетчеризации. Можно и обозвать его соответственно.
5. Какой смысл в handlers = None? Кто процессить-то будет, если у нас такие дефолты?
6. Насчет жесткого вызова spawn я не понял. По-моему тут наоборот любой набор параметров можно использовать. Первые два в задаче - приоритет и название, они пропускаются. Остальные - любые.
1. Тут не согласен, иерархия исключений такова:
ProcessingError(Exception) - верхний уровень исключений. Исключения только относящиеся к самому BaseProcessing
HandlerError(ProcessingError) - различного рода ошибки связанные с обработчиком зачачи.
HandlerTimeout(HandlerError) - конкретная ошибка обработчика

Это пока единственное исключение брошенное в классе:
                if handler is None:
raise HandlerError, \
"Required handler for task {} not defined".format(task[1])
Если я правильно понимаю вы предлагаете сделать нечто вроде
ProcessingError(Exception)
HandlerGeneralError(ProcessingError)
HandlerTimeoutError(HandlerError)
HandlerMissingError(HandlerError)
?
2. Да жестко заданное логгирование у меня хромает, я смотрел как вы настраивали в своем примере http://pastebin.com/M24z5RsV и поправил у себя
3. Спасибо, ушел читать. Декоратор правда все равно получился бесполезный у меня :( т.к. если заставить его бросать исключение то словить его никто не может, оно выпадает “где-то” в гринлете, выводится в консоль но никак не ловится, оно и понятно почему. Поэтому таймаут обработку наверное нужно делать в самом обработчике либо делать заточенный декоратор под определенную задачу, чтобы не повторять себя. Это пока для меня черная магия.
4. Идея лежала на поверхности и я ее не заметил, спасибо.
5. handlers=None нужен для определения своих обработчиков. Это может быть не python waу, представим ситуацию когда в RssProcessing мне нужно переопределить image_handler малой кровью, я могу:
а) processing.image_handler = some_fnc
b) processing = RssProcessing(handlers={'image_handler': some_fnc})

А при запуске задачи следуйщий код определит кто будет обработчиком:
                handler = getattr(self, task[1], None)
if task[1] in self.handlers:
handler = self.handlers[task[1]]
Словарь обработчиков перекрывает определенные в классе.
6. Нет мне не очень нравится как потом выглядят сигнатуры обработчиков, хотя может я просто привередничаю :)
Ed
Silverstorm
1. Тут не согласен, иерархия исключений такова:
Это все прелесно, но давайте посмотрим на уровень повыше. Ваш код будет выглядеть как-нибудь так:
try:
processing = RssProcessing(max_workers=500)
processing.dispatch(tasks)
except ProcessingError, excobj:
# обработка одного исключения
except HandlerError, excobj:
# обработка второго исключения
except HandlerTimeout, excobj:
# обработка третьего исключения
Вот, что я предлагаю:
try:
processing = RssProcessing(max_workers=500)
processing.dispatch(tasks)
except ProcessingError, excobj:
print excobj # тут будет напечатано что именно случилось
....
То есть в модуле будет одно исключение, бросаться будет так, например:
raise ProcessingError, “Required handler for task {} not defined”.format(task)

И не нужно будет устраивать эту лапшу. Излишняя детализация тоже иногда вредна.

Декоратор правда все равно получился бесполезный у меня
попробуйте вот такой декоратор:
class TimeoutError(Exception):
pass

class with_timeout(object):
def __init__(self, timeout):
self.timeout = timeout

def __call__(self, func):
def raiser(signum, frame):
raise TimeoutError("Timeout expired")

def wrapped(*args):
saved = signal.signal(signal.SIGALRM, raiser)
signal.alarm(self.timeout)
try:
func(*args)
finally:
signal.signal(signal.SIGALRM, saved)
signal.alarm(0)
return wrapped
6. Нет мне не очень нравится как потом выглядят сигнатуры обработчиков, хотя может я просто привередничаю :)
Вы просто не поняли идею. Посмотрите мой код. Где там *task? Эта магия при вызове как раз и была задумана для того, чтобы сигнатуры обработчиков были осмысленными.
Silverstorm
Вот последняя реализация тестового мокапа, если еще не надоело смотреть мою лапшу: http://pastebin.com/Ep0EdukC

Пардон, не заметил предидущего сообщения:

1. С детализацией согласен, но возможно мне нужна будет статистика по исключениям (уж очень люблю статистику) например обработано фидов 1000, документов 10000, ошибочных, 500, по таймауту 300 итп. В начале эти данные очень пригодятся для доводки системы.

2.
Ed
Вы просто не поняли идею. Посмотрите мой код. Где там *task? Эта магия при вызове как раз и была задумана для того, чтобы сигнатуры обработчиков были осмысленными.
Да нет же, идею я понял, мне не нравится конкретно этот результат:
def document_handler(self, pool, *task):
foo = task[-1]
bar = task[-2]
task_id = task[0]
Оно конечно так и должно быть, это найболее гибкий вариант, просто именованые как то более привычны, при желании и их можно реализовать, вопрос стоит ли.

3. Попробую ваш декоратор таймаута, хотя чую все равно придется в каждом хендлере делать свой таймаут чекер, хотябы изза того чтобы в каждом хенлдере можно было выполнить какое-то уникальное действие.
Silverstorm
Хотя может быть я ошибаюсь но на декоратор можно повесить параметр вызова коллбек функции в случае неудачи :)

@with_timeout(4, image_failed)
def image_handler(self, *task):
и заставить декоратор вызывать image_failed(*task), только вот реализация мне почему-то кажется настолько же кривой как и просто RY в хендлерах.

ЗА декоратор:
- не каждому хендлеру нужен определенный коллбек в случае неудачи, для 80% хендлеров нужен стандартный логгирующий информацию о таймауте, для 20% нужен кастомный

ПРОТИВ декоратора:
- может расплодится черезмерное количество кастомных функции на случай таймаута хендлера в будущем

Думаю раскалд 80/20 реален, может быть немного более запутанный но все же без повторения одного и того же блока в хенлдерах
data = None
with Timeout(5, False):
data = mysock.makefile().readline()
if data is None:
... # 5 seconds passed without reading a line
else:
... # a line was read within 5 seconds
Ed
по поводу сигнатур хандлеров и * magic вы все-таки не поняли идею. Вот вам пример как правильно должен выглядеть ваш код, согласно идее:
def document_handler(self, handler, feed_id, category)
И никаких *task. На то она и магия :)
Я же советовал вам посмотреть мой код, там же нет никаких *task в сигнатуре.


По поводу таймаута у меня такая идея: сделать таймаут параметром задачи. С одной стороны это некий оверхед, но с другой это позволит одному и тому же типу задач назначать разный таймаут в зависимости от, например, статистики предыдущих вызовов, что круто.
при таком подходе нужно просто сделать в этом же классе врапперчик, который будет реализовывать вызов хандлера с таймаутом. И никаких декораторов, все дешево и сердито. Если нужно - нарисую вам код.
Ed
Вот, что получилось: http://pastebin.com/WCSAQ1Ua
Тут есть один момент, с которым вы сталкивались. При бросании эксепшена внутри воркера он никак не влияет на то, что снаружи. Поэтому чтобы прекратить dispatch можно создать таск с нулевым приоритетом, а в цикле dispatch при появлении оного вызывать специальный обработчик, который, как вариант, бросит эксепшен уже наверху.

И еще одно замечание по вашему коду. Я бы посоветовал обернуть работу с очередью хотя бы в отдельные методы. выставлять queue наружу и работать с ней во всех хандлерах плохо, недостаточный уровень абстракции. Если вместо self.queue.put(task) вы будете звать self.add_task(task) то вы отвязываетесь от Queue, и сможете потом заменить ее на что хотите, просто сохранив интерфейс. Понятно ли?
Silverstorm
Задачу с перехватом эксепшенов я сделал через коллбек. См:
http://pastebin.com/FSTPSuGJ , работает аналогично http://www.gevent.org/gevent.html#gevent.with_timeout

Ed
И еще одно замечание по вашему коду. Я бы посоветовал обернуть работу с очередью хотя бы в отдельные методы. выставлять queue наружу и работать с ней во всех хандлерах плохо, недостаточный уровень абстракции. Если вместо self.queue.put(task) вы будете звать self.add_task(task) то вы отвязываетесь от Queue, и сможете потом заменить ее на что хотите, просто сохранив интерфейс. Понятно ли?
Трезвая мысль, но тогда наверное и убрать родительский pool.Pool стоит, а вместо этого использовать self.pool = pool.Pool() чтобы к нему излишне не привязываться. А то от того что я сделаю метод add_task появится только косметическое изменение т.к. сущность светимая наружу и так продолжает быть наследником pool.Pool со всеми вытекающими.
Ed
Silverstorm
Задачу с перехватом эксепшенов я сделал через коллбек.
Не понял как это может работать. Коллбэк же будет зваться там же, в гринлете. Но раз работает, то и чудесно.
Вот то, что я имел в виду: http://pastebin.com/Jgy5xPyn
Мне такой подход нравится еще и возможностью в вашем случае в потомке определить какую угодно реакцию на разные ошибки. Таймаут просто одна из них.

Трезвая мысль, но тогда наверное и убрать родительский pool.
Не обязательно. Достаточно убрать его использование извне, заменив на некий API.

PS: С сигнатурами-то разобрались? Покажите что у вас получилось.
Silverstorm
Ed
Сигнатуры оставил как есть:

    @with_timeout(1, image_handler_failed)
def image_handler(self, *task):
'''Complete docs at real implementation!
Process image. This handler *knows* about document its working with, thats very usefull for consistency
of document object.

'''
self.debug('image_handler on {} started'.format(task[0]))
document = task[1]
document['images'].append(task[0])
sleep(randint(0, 5))
# decrease number of possible images for this documents that's are awaiting to check
document['images_cnt'] -= 1
self.flush_results()
self.debug('image_handler on {} completed'.format(task[0]))
Ed
Вот то, что я имел в виду: http://pastebin.com/Jgy5xPyn
Мне такой подход нравится еще и возможностью в вашем случае в потомке определить какую угодно реакцию на разные ошибки. Таймаут просто одна из них.
Да согласен, такое поведение получается хоть и немного не по DRY (т.к. таймаут у меня везде нужен) но более гибкое, возможно я вернусь к нему в реальной имплементации.
Ed
Silverstorm
Сигнатуры оставил как есть:
:( А что так? Вам же не нравились эти *task. Мне так тоже не нравится. поскольку непонятно.
This is a "lo-fi" version of our main content. To view the full version with more information, formatting and images, please click here.
Powered by DjangoBB