Найти - Пользователи
Полная версия: Ограничения на параллельное выполнение функции
Начало » Python для новичков » Ограничения на параллельное выполнение функции
1 2
k.palyanichka
Добрый день.
Прошу помочь.

Имею параллельное выполнения функции в 2 потока и мне надо поставить ограничение количества выполнений в минуту суммарно по потокам.

Код
import httplib2
import json
from queue import Queue
from multiprocessing.dummy import Pool as ThreadPool 
def get_data(url):
    h = httplib2.Http(".cache")
    resp, content = h.request(url, "GET")
    global sj_data
    try:
       sj_data = (json.loads(content.decode("utf-8"))['objects'])
    except : pass
    return(sj_data)
h = httplib2.Http(".cache")
pool = ThreadPool(2) 
results = pool.map(get_data, link)
pool.close() 
link - список ссылок.

Спасибо!
JOHN_16
такое ощущение что сдезь нужна очередь - и контролируется проще. модуль Queue
k.palyanichka
Спасибо!
А на простом примере не могли бы продемонстрировать? Думаю многим было бы полезно.
ayb
from multiprocessing.dummy import Pool
import time
counter = 0
limit = 5
cycle_start = time.time()
cycle_end = time.time()
def foo(bar):
    global counter
    global cycle_start
    global cycle_end
    if counter == limit:
        cycle_end = time.time()
        cycle_duration = cycle_end - cycle_start
        time.sleep(60-cycle_duration)
        cycle_start = time.time()
        counter = 0
    print(bar)
    counter += 1
pool = Pool(2)
pool.map(foo, range(50))
JOHN_16
k.palyanichka
Наскоряк накидал учебный пример. Если надо позже объясню.
import Queue, time, threading, datetime
number_of_thread = 2
max_task_per_time = 2
time_period = 5
tasks = [2 * x for x in range(5)]
tasks.extend(range(5))
#tasks = [1, 2, 4, 2, 3, 5, 1, 2]
tasks = [1]*10
queue = Queue.Queue()
for index, task in enumerate(tasks):
    queue.put((index,task))
def work(task):
    time.sleep(task)
list_of_times = []
def check_time():
    if len(list_of_times) >= max_task_per_time:
        if time.time()-list_of_times[-max_task_per_time] > time_period:
            del list_of_times[:-max_task_per_time-1]
            return True
        else:
            False
    else:
        return True
    
def worker():
    
    while True:
        if not check_time():
            print('not time, wait ...')
            time.sleep(1)
            continue
        
        index, task = queue.get()
        list_of_times.append(time.time())
        print('{:%M:%S} Index = {} task = {} get'.format(datetime.datetime.now(),
                                                          index,
                                                          task))
        work(task)
        print('{:%M:%S} Index = {} task = {} done'.format(datetime.datetime.now(),
                                                          index,
                                                          task))
        queue.task_done()
for i in range(number_of_thread):
    thread = threading.Thread(target=worker)
    thread.setDaemon(True)
    thread.start()
queue.join()
Iskatel
Чтото както мудрено, а так чем не подходит?

def worker():
    while 1:
        cur_task = q.get()
        print cur_task
    
    
import Queue
import threading
import time
q = Queue.Queue()
threading.Thread(target = worker).start()
threading.Thread(target = worker).start()
# два потока :)
while #не настодоест:
    q.put(#таска)
    time.sleep(60/#колво в минуту)
#тут еще до кучи прикрутить условия завершения потоков 
#(например "кодовым словом" в Queue)
doza_and
:) В заданной постановке задачи достаточно каждый поток ограничить половинным числом выполнений.
https://docs.python.org/2/library/sched.html
Iskatel
doza_and
В заданной постановке задачи достаточно каждый поток ограничить половинным числом выполнений.https://docs.python.org/2/library/sched.html

Забавный модуль, только с потоками плохо дружит, по приведенной ссылке рекомендуют в потоках юзать threading.Timer. ИМХО модуль юзабелен, когда эвенты разные. А к данной задаче он мало применим… Как например тот пример, что в доке, зациклить чтоб обработал 1000 тасков? Костыли…

А данная постановка задачи вообще туманная. ИМХО автор сам не знает чего хочет добится. Задача имеет смысл только если поток отрабатывает быстрее отведенного ему времени. Т.е. при таком раскладе почти всегда будет работать только один поток, но “иногда”, когда первый притормозит и не успеет, подключится второй. Я чуть мозг не сломал, пытаясь понять: нафига их именно два?? Если неуспевания редки, то можно обойтись без потоков, все делать в основном. А если случаются частенько, тогда и второй может задержаться… Почему два? не проще ли както так:

def worker():
        cur_task = q.get()
        print cur_task
    
    
import Queue
import threading
import time
q = Queue.Queue()
q.put(#все таски сразу)
while not q.empty():
    threading.Thread(target = worker).start()
    time.sleep(60/#колво в минуту)

И в результате, раз в N секунд, будет новый поток.


ЗЫ. Автор жжешь… нафига вот эта строчка global sj_data??? У тебя все потоки пишут в одну глобальную переменную, и если контекст сменится между присвоением и return, то оба потока вернут одно и тоже - результат, который запишет последний выполнившийся поток
doza_and
Iskatel
Забавный модуль
Как правильно заметили есть проблемы с тредами. В свое время не хватило его функциональности и стал использовать https://pypi.python.org/pypi/APScheduler
JOHN_16
Iskatel
В задании не сказано что ограничение равно двум. Там сказано что потоков 2. А ограничение пусть будет любое N. Вы не ту задачу решаете. Поэтому нет не проще как у вас.
This is a "lo-fi" version of our main content. To view the full version with more information, formatting and images, please click here.
Powered by DjangoBB