Найти - Пользователи
Полная версия: gevent - Не знаю как решить задачу с парралелизированием запросов
Начало » Network » gevent - Не знаю как решить задачу с парралелизированием запросов
1 2 3 4
Silverstorm
Привет!

В рамках более глубокого изучения питона решил написать реальный проект - агрегатор блогосферы польского интернета. Проблема появилась в моменте изпользования gevent (http://www.gevent.org) библиотеки для парралельного доступа к удаленному контенту. Ранее я не решал таких задачь и не хочу изобретать свой велосипед. Поэтому решил обратиться за помощью к комьюнити.

Вкратце, схема моей работы выглядит так:


Описание схемы:
У меня есть база данных RSS лент (около 1000) разделенная по категориям. Каждый час нужно проверять их состояние и обрабатывать новые item's в лентах. Обработка заключается в загрузке контента, анализе и поиске картинок для приготовления превью item'a.

Как это происходит в однопоточном режиме (примерный, упрощенный код):

for url, etag, l_mod in rss_urls:
rss_feed = process_rss(url, etag, l_mod) # Read url with last etag, l_mod values
if not rss:
continue

for new_item in rss_feed: # Iterate via *new* items in feed
element = fetch_content(new_item) # Direct https request, download HTML source
if not element:
continue

images = extract_images(element)
goodImages = []
for img in images:
if img_qualify(img): # Download and analyze image if it could be used as a thumbnail
goodImages.append(img)
Т.е. я в цикле пробегаюсь по массиву всех рсс фидов, забираю xml, только тех, у которых появилось что-то новенькое. Далее в цикле анализирую каждый новый элемент (рутинная работа - сохраняю титул, постоянный урл и другие метаданные item'a). На последнем этапе - я пробегаюсь по загруженному документу из фида и анализирую все найденные картинки, чтобы сделать из них превью тумбу.

Т.е. в итоге я делаю http запросы в таких моментах:
- получение рсс документа
- получение item'a из рсс документа
- получение всех картинок рсс документа

Естественно это занимает непростительно много времени даже на моем домашнем интернете т.к. все запросы блокирующие и идут в один поток. Я решил использовать gevent т.к. о нем много хороших отзывов и судя по документации он довольно прост в использовании, как раз для таких задачь.

Чтобы я хотел получить в результате:
- Возможность стабильного ограничения количества исходящих соединений.
- Возможность парралельной загрузки нескольких рсс фидов, парралельного доступа к их новым item'мам и естественно парралельной загрузки картинок каждого item'a.

Как проще/лучше всего этого добиться?

Все что у меня пока есть это идея написать класс который сохраняет в себе количество запущенных задач (и сбрасывает это значение в базу данных с некоторой перидичностью), класс умеет запускать новые задачи и отслеживает событие выполнения какой то задачи. Должен получится некоторый враппер для gevent однако - как приготовить его малой кровью, я пока не знаю и не до конца уверен что такая задача решается именно придуманым способом. Я не хочу сильно (вообще?) менять функции, непосредственно отвественные за вызов удаленного соединения. В идеале думаю правильнее всего должен получится результат, когда системе все равно есть gevent или его нет - если его нет, то все будет запускаться в однопоточном режиме (удобно для тестирования), если же он есть - система будет использовать указанное количество Pool с определенными таймаутами.

Буду рад любым наставническим советам.
Спасибо!
Ed
Если именно gevent хочется использовать, то ограничить количество потоков можно с помощью этого: http://www.gevent.org/gevent.pool.html#gevent.pool.Pool. Параллелизация же вообще является основной функцией gevent.
Посмотрите эти 2 примера и задавайте более конкретные вопросы, а то складывается ощущение, что вы документацию по gevent не читали.
http://bitbucket.org/denis/gevent/src/1ceaaec15768/examples/dns_mass_resolve.py
http://bitbucket.org/denis/gevent/src/1ceaaec15768/examples/concurrent_download.py
Silverstorm
Нет, как же - документацию я прочитал вдоль и поперек. В ней не хватает примеров, по крайней мере для новичка.

Если я правильно понимаю - то что мне нужно лежит в http://www.gevent.org/intro.html#cooperative-multitasking
Однако я не могу разобраться что конкретно мне нужно - Queue, JoinableQueue или Event :(

Сжимая инфу из первого поста задача у меня такая - есть некий несколькоэтапный процесс (получение фидов, затем посещение урлов из фида, затем загрузка и анализ картинок из посещенных урлов) каждый этап добавляет количество внешних ресурсов к обработке.

Т.е. грубо говоря было 100 фидов, набралось 1000 урлов к посещению, каждый урл содержить 10 картинок, получилось 10.000 урлов к посещению
Задача в том чтобы на каждом этапе можно было добавлять в очередь (Queue) noвый пакет урлов к обработке не превышая пула.
Zubchick
мне недавно насоветовали celery, там есть, кажется, все что вам нужно. Расписания, параллельные таски, под-таски, очереди. А ведь я тоже хотел gevent использовать. Видимо он хорошо пеареный :)
Ed
Silverstorm
Т.е. грубо говоря было 100 фидов, набралось 1000 урлов к посещению, каждый урл содержить 10 картинок, получилось 10.000 урлов к посещению
Задача в том чтобы на каждом этапе можно было добавлять в очередь (Queue) noвый пакет урлов к обработке не превышая пула.
Ну так вот и используйте Queue и Pool.
Я бы делал так:

1. Создавал бы очередь и кидал бы в нее сначала задачи первого этапа:
from gevent import  queue

wqueue = queue.Queue()
for i in range(10):
wqueue.put(('first_task', i, wqueue)) # очередь сюда передается, чтобы задача могла добавлять туда задачи второго этапа
В качестве задачи здесь используется кортеж. Первый элемент - имя задачи, остальные - параметры. код, который эту задачу выполняет будет знать что это за параметры.

2. Создавал бы пул:
from gevent import pool
wpool = pool.Pool(max_workers)
3. И соответствие обработчиков задач их именам:
workers = {"first_task": do_first, "second_task": do_second}
4. А потом тупо диспетчеризировал:
    # dispatch tasks
while True:
if not wpool.full():
while True:
try:
task = wqueue.get_nowait()
except queue.Empty:
break
LOG.debug("do_it: starting task %s" % task[0])
wpool.spawn(workers[task[0]], task)
if wpool.free_count() == max_workers:
break
wpool.join()

gevent.sleep(1)
5. Ну,а в обработчике первого уровня кроме основного функционала создавал бы задачи второго уровня:
def do_first(task):
task_id, wqueue = task[1:]
LOG.debug("do_first %d", task_id)
# ... do something useful here
# add second step tasks to the same queue
for i in range(randint(1, 10)):
wqueue.put(("second_task", "%d-%d" % (task_id, i)))
gevent.sleep(randint(1, 10))
LOG.debug("do_first %d done" % task_id)
Ну и для полноты картинки вот вам do_second:
def do_second(task):
task_id = task[1]
LOG.debug("do_second %s", task_id)
gevent.sleep(randint(1, 2))
LOG.debug("do_second %s", task_id)
Но тут есть один нюанс. При таком подходе пока не выполнятся таски первого уровня таски второго не начнут выполняться. Если такой алгоритм не подходит, то вместо Pool нужно самому создавать и запускать Greenlet-ы и следить за тем, чтобы их было не больше, чем надо. Но это уже другая история :)

PS: В вашем случае задачи первого этапа - получение фидов, задача второго - посещение урлов из фидов ну и т.д.
Silverstorm
Ed
Большое спасибо за примеры, сегодня будет долгая и продуктивная ночь :) стоя в пробке примерно такой же алгоритм укладывал в голове, с поправкой на задачи первого уровня - чтобы не ждать пока все фиды загрузяться уметь начинать работать с уже собранными данными (иначе пришлось бы держать оочень много данных в памяти процесса что не есть хорошо)

Также думаю ввести какие-то идентификаторы задачь чтобы например обрабатывая картинку знать от какого она документа, обрабатывая документ знать из какого он фида и так по нарастающей. Это должно дать возможность высвобождать память и сохранять обработанные результаты во время выполнения скрипта а не в конце когда все задачи выполнены.
Ed
Я думаю большого смысла в смешивании задач первого и второго уровня нет, так как узкое место у нас - соединения, а они будут заняты по любому. У вас же задачи так или иначе связаны с соединениями. Не вижу надобности начинать ходить по урлам(задача второго уровня) пока не обработаны все фиды.
Хотя, если вам удастся сделать это малой кровью, не сильно усложняя код, то наверное это имеет смысл. Причина проста - если количество задач одного уровня некратно количеству процессов в пуле, то на последней итерации пул будет недогружен. Не думаю, что это такая большая проблема, но если хочется сделать все красиво, то почему бы и нет.
Silverstorm
Рабочая версия синтетического теста получилась. (уж простите за простыню)
Решил использовать PriorityQueue так как он позволяет решить задачу приоритетности задач, сейчас получается что важнее закончить обработку документа чем начинать обрабатывать новые фиды.

Таймауты пришлось делать декоратором т.к. повесить таймаут на выполнение одной задачи штатными средствами кажется не представляется возможным. :/
Вопрос - не извращение ли помещать with as внутри try except ?

Пожалуйста покритикуйте код - включая стиль, мне очень пригодится. (Отсутствие докстрингов преднамеренное)

import gevent
import logging as log
import time
import uuid

from random import randint
from gevent import queue, pool, monkey
from functools import wraps

monkey.patch_all()

log.basicConfig(filename='runtime.txt', level=log.DEBUG)

start_time = time.time()
jobs_done = 0
jobs_processed = 0

def with_timeout(timeout):
def factory(fn):
@wraps(fn)
def decorator(task):
global jobs_done, jobs_processed # just for testing purpose, will gone in production
try:
jobs_done += 1
jobs_processed += 1
with gevent.Timeout(timeout):
return fn(task)
except gevent.Timeout:
jobs_done -= 1
log.debug("Task `{0}:{1}` timed out".format(task[1], task[2]))
return decorator
return factory

@with_timeout(6)
def get_feed(task):
task_id, job_queue = task[2:4]
log.debug("get_feed {0} downloading...".format(task_id))
# Feed downloaded, new items extracted
for i in range(randint(1, 10)):
job_queue.put((2, 'get_item', uuid.uuid4(), job_queue))
gevent.sleep(randint(0, 5))
log.debug('get_feed {0} done'.format(task_id))

@with_timeout(3)
def get_item(task):
task_id, job_queue = task[2:4]
log.debug("get_item {0} downloading...".format(task_id))
# Item downloaded, images extracted
for i in range(randint(1, 50)):
job_queue.put((1, 'get_image', uuid.uuid4(), job_queue))
gevent.sleep(randint(0, 5))
log.debug('get_item {0} done'.format(task_id))

@with_timeout(3)
def get_image(task):
task_id, job_queue = task[2:4]
log.debug("get_image {0} downloading...".format(task_id))
# Downloading image...
gevent.sleep(randint(0, 5))
log.debug('get_item {0} done'.format(task_id))

MAX_WORKERS = 100
workers = {
'get_feed': get_feed,
'get_item': get_item,
'get_image': get_image,
}

job_queue = queue.PriorityQueue()
job_pool = pool.Pool(MAX_WORKERS)

# add all feeds to queue
for i in xrange(5):
job_queue.put((3, 'get_feed', uuid.uuid4(), job_queue))

def dispatch():
while True:
if not job_pool.full():
while True:
try:
task = job_queue.get_nowait()
except queue.Empty:
break
log.debug("Starting task {0}:{1}".format(task[1], task[2]))
job_pool.spawn(workers[task[1]], task)
if job_pool.free_count() == MAX_WORKERS:
break
job_pool.join()

gevent.sleep(1)

dispatch()
log.debug("Total jobs created: {0}".format(jobs_processed))
log.debug("Total jobs done: {0}".format(jobs_done))
log.debug("Processed in {:.3} sec".format(time.time() - start_time))
Каким эмпирическим путем можно подобрать правильное количество воркеров в пуле?
Остается только ввести перелинковку данных чтобы картинка знала к какому документу она принадлежит, документ знал к какому фиду, а фид к категории.
Silverstorm
Ed
Я думаю большого смысла в смешивании задач первого и второго уровня нет, так как узкое место у нас - соединения, а они будут заняты по любому. У вас же задачи так или иначе связаны с соединениями. Не вижу надобности начинать ходить по урлам(задача второго уровня) пока не обработаны все фиды.
Теоретически вы правы, но принимая во внимание что фидов будет около 2000 то можно предугадать что в определенные моменты времени каждый из них будет иметь не менее 5 новых документов к обработке, т.е. 10.000 документов а так как они обрабатываются по части lxml то памяти процесс может сожрать прилично (даже если считать что один документ это 50кб html данных то, если не использовать приоритетности обработки документов в конце у нас будет одних данных в памяти на 500мб) поэтому я решил ввести приоритетность - как только обработка документа закончена - мы можем смело его сбросить в бд и освободить память.
Ed
Поревьювил слегка ваш код.
1. Нет __name__ == ‘__main__’ и иже с ним. См. сюда: http://www.artima.com/weblogs/viewpost.jsp?thread=4829
2. Пользуйте pylint, он рулез. В следующий раз с pylint rate < 9 не приходите :)
3. log.basicConfig(filename='runtime.txt'… - расширение .txt для конфигурационного файла смотрится не очень.
4. Какой смысл в числе в кортеже task. Не вижу где он используется. Я правильно понял, что это id и вы его собираетесь в базу класть или что-то типа этого? Если да, то это делается не так.
5. Остальной код фактически мой или я не вижу ваших изменений. Вы тут вроде спорили со мной, а где код, где реализация вашей точки зрения?
6. У Вас была хорошая идея обернуть это в класс и уметь работать как с gevent, так и без него. Тоже не вижу попыток ее реализовать. Класс поможет избавиться от глобальных переменных, что тоже неплохо.
7. Докстринги вы убрали для уменьшения размера кода? Тогда лучше их вернуть и юзать для ревью более подходящие тулзы, например это: http://pastebin.com/ или еще лучше это: http://codereview.appspot.com/
This is a "lo-fi" version of our main content. To view the full version with more information, formatting and images, please click here.
Powered by DjangoBB