Уведомления

Группа в Telegram: @pythonsu

#1 Июль 23, 2015 11:29:57

k.palyanichka
Зарегистрирован: 2015-05-29
Сообщения: 20
Репутация: +  0  -
Профиль   Отправить e-mail  

Ограничения на параллельное выполнение функции

Добрый день.
Прошу помочь.

Имею параллельное выполнения функции в 2 потока и мне надо поставить ограничение количества выполнений в минуту суммарно по потокам.

Код

import httplib2
import json
from queue import Queue
from multiprocessing.dummy import Pool as ThreadPool 
def get_data(url):
    h = httplib2.Http(".cache")
    resp, content = h.request(url, "GET")
    global sj_data
    try:
       sj_data = (json.loads(content.decode("utf-8"))['objects'])
    except : pass
    return(sj_data)
h = httplib2.Http(".cache")
pool = ThreadPool(2) 
results = pool.map(get_data, link)
pool.close() 
link - список ссылок.

Спасибо!

Офлайн

#2 Июль 23, 2015 13:17:44

JOHN_16
От: Россия, Петропавловск-Камчатск
Зарегистрирован: 2010-03-22
Сообщения: 3292
Репутация: +  221  -
Профиль   Отправить e-mail  

Ограничения на параллельное выполнение функции

такое ощущение что сдезь нужна очередь - и контролируется проще. модуль Queue



_________________________________________________________________________________
полезный блог о python john16blog.blogspot.com

Офлайн

#3 Июль 23, 2015 14:54:18

k.palyanichka
Зарегистрирован: 2015-05-29
Сообщения: 20
Репутация: +  0  -
Профиль   Отправить e-mail  

Ограничения на параллельное выполнение функции

Спасибо!
А на простом примере не могли бы продемонстрировать? Думаю многим было бы полезно.

Отредактировано k.palyanichka (Июль 23, 2015 14:54:59)

Офлайн

#4 Июль 24, 2015 08:23:57

ayb
Зарегистрирован: 2014-04-01
Сообщения: 297
Репутация: +  24  -
Профиль   Отправить e-mail  

Ограничения на параллельное выполнение функции

from multiprocessing.dummy import Pool
import time
counter = 0
limit = 5
cycle_start = time.time()
cycle_end = time.time()
def foo(bar):
    global counter
    global cycle_start
    global cycle_end
    if counter == limit:
        cycle_end = time.time()
        cycle_duration = cycle_end - cycle_start
        time.sleep(60-cycle_duration)
        cycle_start = time.time()
        counter = 0
    print(bar)
    counter += 1
pool = Pool(2)
pool.map(foo, range(50))

Отредактировано ayb (Июль 24, 2015 08:37:16)

Офлайн

#5 Июль 24, 2015 13:37:09

JOHN_16
От: Россия, Петропавловск-Камчатск
Зарегистрирован: 2010-03-22
Сообщения: 3292
Репутация: +  221  -
Профиль   Отправить e-mail  

Ограничения на параллельное выполнение функции

k.palyanichka
Наскоряк накидал учебный пример. Если надо позже объясню.

import Queue, time, threading, datetime
number_of_thread = 2
max_task_per_time = 2
time_period = 5
tasks = [2 * x for x in range(5)]
tasks.extend(range(5))
#tasks = [1, 2, 4, 2, 3, 5, 1, 2]
tasks = [1]*10
queue = Queue.Queue()
for index, task in enumerate(tasks):
    queue.put((index,task))
def work(task):
    time.sleep(task)
list_of_times = []
def check_time():
    if len(list_of_times) >= max_task_per_time:
        if time.time()-list_of_times[-max_task_per_time] > time_period:
            del list_of_times[:-max_task_per_time-1]
            return True
        else:
            False
    else:
        return True
    
def worker():
    
    while True:
        if not check_time():
            print('not time, wait ...')
            time.sleep(1)
            continue
        
        index, task = queue.get()
        list_of_times.append(time.time())
        print('{:%M:%S} Index = {} task = {} get'.format(datetime.datetime.now(),
                                                          index,
                                                          task))
        work(task)
        print('{:%M:%S} Index = {} task = {} done'.format(datetime.datetime.now(),
                                                          index,
                                                          task))
        queue.task_done()
for i in range(number_of_thread):
    thread = threading.Thread(target=worker)
    thread.setDaemon(True)
    thread.start()
queue.join()



_________________________________________________________________________________
полезный блог о python john16blog.blogspot.com

Офлайн

#6 Авг. 3, 2015 07:06:46

Iskatel
Зарегистрирован: 2015-07-29
Сообщения: 291
Репутация: +  3  -
Профиль   Отправить e-mail  

Ограничения на параллельное выполнение функции

Чтото както мудрено, а так чем не подходит?

def worker():
    while 1:
        cur_task = q.get()
        print cur_task
    
    
import Queue
import threading
import time
q = Queue.Queue()
threading.Thread(target = worker).start()
threading.Thread(target = worker).start()
# два потока :)
while #не настодоест:
    q.put(#таска)
    time.sleep(60/#колво в минуту)
#тут еще до кучи прикрутить условия завершения потоков 
#(например "кодовым словом" в Queue)

Отредактировано Iskatel (Авг. 3, 2015 07:07:31)

Офлайн

#7 Авг. 3, 2015 07:52:12

doza_and
От:
Зарегистрирован: 2010-08-15
Сообщения: 4138
Репутация: +  253  -
Профиль   Отправить e-mail  

Ограничения на параллельное выполнение функции

:) В заданной постановке задачи достаточно каждый поток ограничить половинным числом выполнений.
https://docs.python.org/2/library/sched.html



Отредактировано doza_and (Авг. 3, 2015 07:52:52)

Офлайн

#8 Авг. 3, 2015 15:11:09

Iskatel
Зарегистрирован: 2015-07-29
Сообщения: 291
Репутация: +  3  -
Профиль   Отправить e-mail  

Ограничения на параллельное выполнение функции

doza_and
В заданной постановке задачи достаточно каждый поток ограничить половинным числом выполнений.https://docs.python.org/2/library/sched.html

Забавный модуль, только с потоками плохо дружит, по приведенной ссылке рекомендуют в потоках юзать threading.Timer. ИМХО модуль юзабелен, когда эвенты разные. А к данной задаче он мало применим… Как например тот пример, что в доке, зациклить чтоб обработал 1000 тасков? Костыли…

А данная постановка задачи вообще туманная. ИМХО автор сам не знает чего хочет добится. Задача имеет смысл только если поток отрабатывает быстрее отведенного ему времени. Т.е. при таком раскладе почти всегда будет работать только один поток, но “иногда”, когда первый притормозит и не успеет, подключится второй. Я чуть мозг не сломал, пытаясь понять: нафига их именно два?? Если неуспевания редки, то можно обойтись без потоков, все делать в основном. А если случаются частенько, тогда и второй может задержаться… Почему два? не проще ли както так:

def worker():
        cur_task = q.get()
        print cur_task
    
    
import Queue
import threading
import time
q = Queue.Queue()
q.put(#все таски сразу)
while not q.empty():
    threading.Thread(target = worker).start()
    time.sleep(60/#колво в минуту)

И в результате, раз в N секунд, будет новый поток.


ЗЫ. Автор жжешь… нафига вот эта строчка global sj_data??? У тебя все потоки пишут в одну глобальную переменную, и если контекст сменится между присвоением и return, то оба потока вернут одно и тоже - результат, который запишет последний выполнившийся поток

Отредактировано Iskatel (Авг. 3, 2015 15:29:51)

Офлайн

#9 Авг. 3, 2015 20:24:21

doza_and
От:
Зарегистрирован: 2010-08-15
Сообщения: 4138
Репутация: +  253  -
Профиль   Отправить e-mail  

Ограничения на параллельное выполнение функции

Iskatel
Забавный модуль
Как правильно заметили есть проблемы с тредами. В свое время не хватило его функциональности и стал использовать https://pypi.python.org/pypi/APScheduler



Офлайн

#10 Авг. 4, 2015 00:01:09

JOHN_16
От: Россия, Петропавловск-Камчатск
Зарегистрирован: 2010-03-22
Сообщения: 3292
Репутация: +  221  -
Профиль   Отправить e-mail  

Ограничения на параллельное выполнение функции

Iskatel
В задании не сказано что ограничение равно двум. Там сказано что потоков 2. А ограничение пусть будет любое N. Вы не ту задачу решаете. Поэтому нет не проще как у вас.



_________________________________________________________________________________
полезный блог о python john16blog.blogspot.com

Офлайн

Board footer

Модераторировать

Powered by DjangoBB

Lo-Fi Version