Уведомления

Группа в Telegram: @pythonsu

#1 Ноя. 8, 2011 17:33:50

Spectral
От:
Зарегистрирован: 2010-07-13
Сообщения: 51
Репутация: +  1  -
Профиль   Отправить e-mail  

Реализация параллельной загрузки картинок в Python

Добрый вечер!

Подскажите пожалуйста, через какой модуль лучше всего реализовывать такую задачку :
есть список URL картинок, которые можно скачать, например, с помощью urlretrieve,
хочется, чтобы скрипт умел загружать картинки параллельно, примерно по такой схеме -
новую параллельную загрузку не начинать, пока не закончили скачиваться, например, 10 уже имеющихся загрузок.
Оптимально ли будет просто написать цикл по циклу, в котором стартуются функции загрузки и применён метод join (или joinall)?

Так вот, через какой модуль лучше всего это делать - Threading или subprocess?
Может ли кто-нибудь подкинуть какой-нибудь готовый пример, где реализована параллельная загрузка?
Благодарю за внимание.



Офлайн

#2 Ноя. 8, 2011 17:35:46

s0rg
От:
Зарегистрирован: 2011-06-05
Сообщения: 777
Репутация: +  25  -
Профиль   Отправить e-mail  

Реализация параллельной загрузки картинок в Python

http://docs.python.org/library/multiprocessing.html

Отредактировано (Ноя. 8, 2011 17:40:04)

Офлайн

#3 Ноя. 8, 2011 20:54:43

bw
От:
Зарегистрирован: 2007-09-26
Сообщения: 938
Репутация: +  20  -
Профиль   Адрес электронной почты  

Реализация параллельной загрузки картинок в Python

Потоков достаточно. Примерный алгоритм элементарен:
1. Создаёшь очередь Queue.
2. Заполняешь очередь (опционально, зависит от алгоритма).
3. Запускаешь 10 потоков, все они имеют один алгоритм.
4. Рабочий поток берёт из очереди задачу (если нет ничего – ждёт или или завершается, или завершается по спец. маркеру полученному из очереди) и выполняет её.
5. Главный поток пополняет очередь заданиями или нихрена не делает, ожидает завершения рабочих.

..bw



Офлайн

#4 Ноя. 10, 2011 21:01:22

Spectral
От:
Зарегистрирован: 2010-07-13
Сообщения: 51
Репутация: +  1  -
Профиль   Отправить e-mail  

Реализация параллельной загрузки картинок в Python

Снова добрый вечер!
Спасибо bw за наводку к подходу.
Остановился на таком подходе (код не совсем рабочий):

from Queue import Queue
from threading import Thread

source = [obj1, obj2, obj3]
num_worker_threads = ...

def do_work(item):
...
return result

def worker(q):
while True:
item = q.get()
do_work(item)
q.task_done()

q = Queue()
for i in range(num_worker_threads):
t = Thread(target=worker, args=(q,))
t.daemon = True
t.start()

for item in source:
q.put(item)

q.join()
Теперь применительно к своей задаче я написал ещё и внешний цикл ко всему этому (естественно, определения функций из него вынес),
чтобы формировать очереди снова и снова. Получится вполне рабочий скрипт, НО:
в конце концов скрипт с ошибкой “thread.error: can't start new thread”, сообщение об ошибке от треда с номером то ли за 600.
Может ли кто-нибудь подсказать, в чём дело или как нужно подправить скрипт?
Я думал, что метод q.join ожидает окончания всех загруженных тредов и очищается сам, а вместо этого он только наполняется
новыми тредами, несмотря на то, что q пересоздается внутри цикла. Как быть?
UPD.:
Хм, наверно из-за while True: тред и живет вечно)

UPD № 2.:
Да, функцию worker переписать вот так:
def worker(q):
item = q.get()
do_work(item)
q.task_done()
Вопрос снимается! :)



Отредактировано (Ноя. 10, 2011 23:06:53)

Офлайн

#5 Ноя. 13, 2011 08:55:22

bw
От:
Зарегистрирован: 2007-09-26
Сообщения: 938
Репутация: +  20  -
Профиль   Адрес электронной почты  

Реализация параллельной загрузки картинок в Python

> Хм, наверно из-за while True: тред и живет вечно)
> функцию worker переписать вот так
Плохо переписал, он там нужен.

Как я говорил, есть тысяча и один способ завершения рабочего потока, зависит от деталей, например от того как очередь пополняется:

1. Очередь заполнена до начала работы и если она опустошается, то это всё.

q = Queue.Queue()
while True:
try:
task = q.get_nowait()
except Queue.Empty:
break
do_somthing(task)
2. Очередь будет пополняться задачами во время работы (используем некий маркер для выхода, “пустую” задачу; он пихается в очередь раздатчиком задач до тех пора пока очередь пуста и пока есть работающие потоки).
_go_home_yankee = object()
q = Queue.Queue()
while True:
task = q.get()
if task is _go_home_yankee:
break
do_somthing(task)
В общем это всё совершенно элементарно. И это всё без join'а, не люблю я его, да как и потоки :-).
Что касается task_done + join + daemon + while, почему не сработало не знаю, task_done и join никогда не пользовался, но while там нужен обязательно.

..bw



Отредактировано (Ноя. 13, 2011 08:56:29)

Офлайн

Board footer

Модераторировать

Powered by DjangoBB

Lo-Fi Version