Форум сайта python.su
Привет!
В рамках более глубокого изучения питона решил написать реальный проект - агрегатор блогосферы польского интернета. Проблема появилась в моменте изпользования gevent (http://www.gevent.org) библиотеки для парралельного доступа к удаленному контенту. Ранее я не решал таких задачь и не хочу изобретать свой велосипед. Поэтому решил обратиться за помощью к комьюнити.
Вкратце, схема моей работы выглядит так:
Описание схемы:
У меня есть база данных RSS лент (около 1000) разделенная по категориям. Каждый час нужно проверять их состояние и обрабатывать новые item's в лентах. Обработка заключается в загрузке контента, анализе и поиске картинок для приготовления превью item'a.
Как это происходит в однопоточном режиме (примерный, упрощенный код):
for url, etag, l_mod in rss_urls:
rss_feed = process_rss(url, etag, l_mod) # Read url with last etag, l_mod values
if not rss:
continue
for new_item in rss_feed: # Iterate via *new* items in feed
element = fetch_content(new_item) # Direct https request, download HTML source
if not element:
continue
images = extract_images(element)
goodImages = []
for img in images:
if img_qualify(img): # Download and analyze image if it could be used as a thumbnail
goodImages.append(img)
Отредактировано (Дек. 5, 2010 23:17:33)
Офлайн
Если именно gevent хочется использовать, то ограничить количество потоков можно с помощью этого: http://www.gevent.org/gevent.pool.html#gevent.pool.Pool. Параллелизация же вообще является основной функцией gevent.
Посмотрите эти 2 примера и задавайте более конкретные вопросы, а то складывается ощущение, что вы документацию по gevent не читали.
http://bitbucket.org/denis/gevent/src/1ceaaec15768/examples/dns_mass_resolve.py
http://bitbucket.org/denis/gevent/src/1ceaaec15768/examples/concurrent_download.py
Офлайн
Нет, как же - документацию я прочитал вдоль и поперек. В ней не хватает примеров, по крайней мере для новичка.
Если я правильно понимаю - то что мне нужно лежит в http://www.gevent.org/intro.html#cooperative-multitasking
Однако я не могу разобраться что конкретно мне нужно - Queue, JoinableQueue или Event :(
Сжимая инфу из первого поста задача у меня такая - есть некий несколькоэтапный процесс (получение фидов, затем посещение урлов из фида, затем загрузка и анализ картинок из посещенных урлов) каждый этап добавляет количество внешних ресурсов к обработке.
Т.е. грубо говоря было 100 фидов, набралось 1000 урлов к посещению, каждый урл содержить 10 картинок, получилось 10.000 урлов к посещению
Задача в том чтобы на каждом этапе можно было добавлять в очередь (Queue) noвый пакет урлов к обработке не превышая пула.
Офлайн
мне недавно насоветовали celery, там есть, кажется, все что вам нужно. Расписания, параллельные таски, под-таски, очереди. А ведь я тоже хотел gevent использовать. Видимо он хорошо пеареный :)
Офлайн
SilverstormНу так вот и используйте Queue и Pool.
Т.е. грубо говоря было 100 фидов, набралось 1000 урлов к посещению, каждый урл содержить 10 картинок, получилось 10.000 урлов к посещению
Задача в том чтобы на каждом этапе можно было добавлять в очередь (Queue) noвый пакет урлов к обработке не превышая пула.
from gevent import queue
wqueue = queue.Queue()
for i in range(10):
wqueue.put(('first_task', i, wqueue)) # очередь сюда передается, чтобы задача могла добавлять туда задачи второго этапа
from gevent import pool
wpool = pool.Pool(max_workers)
workers = {"first_task": do_first, "second_task": do_second}
# dispatch tasks
while True:
if not wpool.full():
while True:
try:
task = wqueue.get_nowait()
except queue.Empty:
break
LOG.debug("do_it: starting task %s" % task[0])
wpool.spawn(workers[task[0]], task)
if wpool.free_count() == max_workers:
break
wpool.join()
gevent.sleep(1)
def do_first(task):
task_id, wqueue = task[1:]
LOG.debug("do_first %d", task_id)
# ... do something useful here
# add second step tasks to the same queue
for i in range(randint(1, 10)):
wqueue.put(("second_task", "%d-%d" % (task_id, i)))
gevent.sleep(randint(1, 10))
LOG.debug("do_first %d done" % task_id)
def do_second(task):
task_id = task[1]
LOG.debug("do_second %s", task_id)
gevent.sleep(randint(1, 2))
LOG.debug("do_second %s", task_id)
Отредактировано (Дек. 6, 2010 20:13:00)
Офлайн
Ed
Большое спасибо за примеры, сегодня будет долгая и продуктивная ночь :) стоя в пробке примерно такой же алгоритм укладывал в голове, с поправкой на задачи первого уровня - чтобы не ждать пока все фиды загрузяться уметь начинать работать с уже собранными данными (иначе пришлось бы держать оочень много данных в памяти процесса что не есть хорошо)
Также думаю ввести какие-то идентификаторы задачь чтобы например обрабатывая картинку знать от какого она документа, обрабатывая документ знать из какого он фида и так по нарастающей. Это должно дать возможность высвобождать память и сохранять обработанные результаты во время выполнения скрипта а не в конце когда все задачи выполнены.
Офлайн
Я думаю большого смысла в смешивании задач первого и второго уровня нет, так как узкое место у нас - соединения, а они будут заняты по любому. У вас же задачи так или иначе связаны с соединениями. Не вижу надобности начинать ходить по урлам(задача второго уровня) пока не обработаны все фиды.
Хотя, если вам удастся сделать это малой кровью, не сильно усложняя код, то наверное это имеет смысл. Причина проста - если количество задач одного уровня некратно количеству процессов в пуле, то на последней итерации пул будет недогружен. Не думаю, что это такая большая проблема, но если хочется сделать все красиво, то почему бы и нет.
Отредактировано (Дек. 6, 2010 22:46:38)
Офлайн
Рабочая версия синтетического теста получилась. (уж простите за простыню)
Решил использовать PriorityQueue так как он позволяет решить задачу приоритетности задач, сейчас получается что важнее закончить обработку документа чем начинать обрабатывать новые фиды.
Таймауты пришлось делать декоратором т.к. повесить таймаут на выполнение одной задачи штатными средствами кажется не представляется возможным. :/
Вопрос - не извращение ли помещать with as внутри try except ?
Пожалуйста покритикуйте код - включая стиль, мне очень пригодится. (Отсутствие докстрингов преднамеренное)
import gevent
import logging as log
import time
import uuid
from random import randint
from gevent import queue, pool, monkey
from functools import wraps
monkey.patch_all()
log.basicConfig(filename='runtime.txt', level=log.DEBUG)
start_time = time.time()
jobs_done = 0
jobs_processed = 0
def with_timeout(timeout):
def factory(fn):
@wraps(fn)
def decorator(task):
global jobs_done, jobs_processed # just for testing purpose, will gone in production
try:
jobs_done += 1
jobs_processed += 1
with gevent.Timeout(timeout):
return fn(task)
except gevent.Timeout:
jobs_done -= 1
log.debug("Task `{0}:{1}` timed out".format(task[1], task[2]))
return decorator
return factory
@with_timeout(6)
def get_feed(task):
task_id, job_queue = task[2:4]
log.debug("get_feed {0} downloading...".format(task_id))
# Feed downloaded, new items extracted
for i in range(randint(1, 10)):
job_queue.put((2, 'get_item', uuid.uuid4(), job_queue))
gevent.sleep(randint(0, 5))
log.debug('get_feed {0} done'.format(task_id))
@with_timeout(3)
def get_item(task):
task_id, job_queue = task[2:4]
log.debug("get_item {0} downloading...".format(task_id))
# Item downloaded, images extracted
for i in range(randint(1, 50)):
job_queue.put((1, 'get_image', uuid.uuid4(), job_queue))
gevent.sleep(randint(0, 5))
log.debug('get_item {0} done'.format(task_id))
@with_timeout(3)
def get_image(task):
task_id, job_queue = task[2:4]
log.debug("get_image {0} downloading...".format(task_id))
# Downloading image...
gevent.sleep(randint(0, 5))
log.debug('get_item {0} done'.format(task_id))
MAX_WORKERS = 100
workers = {
'get_feed': get_feed,
'get_item': get_item,
'get_image': get_image,
}
job_queue = queue.PriorityQueue()
job_pool = pool.Pool(MAX_WORKERS)
# add all feeds to queue
for i in xrange(5):
job_queue.put((3, 'get_feed', uuid.uuid4(), job_queue))
def dispatch():
while True:
if not job_pool.full():
while True:
try:
task = job_queue.get_nowait()
except queue.Empty:
break
log.debug("Starting task {0}:{1}".format(task[1], task[2]))
job_pool.spawn(workers[task[1]], task)
if job_pool.free_count() == MAX_WORKERS:
break
job_pool.join()
gevent.sleep(1)
dispatch()
log.debug("Total jobs created: {0}".format(jobs_processed))
log.debug("Total jobs done: {0}".format(jobs_done))
log.debug("Processed in {:.3} sec".format(time.time() - start_time))
Офлайн
EdТеоретически вы правы, но принимая во внимание что фидов будет около 2000 то можно предугадать что в определенные моменты времени каждый из них будет иметь не менее 5 новых документов к обработке, т.е. 10.000 документов а так как они обрабатываются по части lxml то памяти процесс может сожрать прилично (даже если считать что один документ это 50кб html данных то, если не использовать приоритетности обработки документов в конце у нас будет одних данных в памяти на 500мб) поэтому я решил ввести приоритетность - как только обработка документа закончена - мы можем смело его сбросить в бд и освободить память.
Я думаю большого смысла в смешивании задач первого и второго уровня нет, так как узкое место у нас - соединения, а они будут заняты по любому. У вас же задачи так или иначе связаны с соединениями. Не вижу надобности начинать ходить по урлам(задача второго уровня) пока не обработаны все фиды.
Офлайн
Поревьювил слегка ваш код.
1. Нет __name__ == ‘__main__’ и иже с ним. См. сюда: http://www.artima.com/weblogs/viewpost.jsp?thread=4829
2. Пользуйте pylint, он рулез. В следующий раз с pylint rate < 9 не приходите :)
3. log.basicConfig(filename='runtime.txt'… - расширение .txt для конфигурационного файла смотрится не очень.
4. Какой смысл в числе в кортеже task. Не вижу где он используется. Я правильно понял, что это id и вы его собираетесь в базу класть или что-то типа этого? Если да, то это делается не так.
5. Остальной код фактически мой или я не вижу ваших изменений. Вы тут вроде спорили со мной, а где код, где реализация вашей точки зрения?
6. У Вас была хорошая идея обернуть это в класс и уметь работать как с gevent, так и без него. Тоже не вижу попыток ее реализовать. Класс поможет избавиться от глобальных переменных, что тоже неплохо.
7. Докстринги вы убрали для уменьшения размера кода? Тогда лучше их вернуть и юзать для ревью более подходящие тулзы, например это: http://pastebin.com/ или еще лучше это: http://codereview.appspot.com/
Офлайн