Форум сайта python.su
0
Добрый день.
Прошу помочь.
Имею параллельное выполнения функции в 2 потока и мне надо поставить ограничение количества выполнений в минуту суммарно по потокам.
Код
import httplib2 import json from queue import Queue from multiprocessing.dummy import Pool as ThreadPool def get_data(url): h = httplib2.Http(".cache") resp, content = h.request(url, "GET") global sj_data try: sj_data = (json.loads(content.decode("utf-8"))['objects']) except : pass return(sj_data) h = httplib2.Http(".cache") pool = ThreadPool(2) results = pool.map(get_data, link) pool.close()
Офлайн
221
такое ощущение что сдезь нужна очередь - и контролируется проще. модуль Queue
Офлайн
0
Спасибо!
А на простом примере не могли бы продемонстрировать? Думаю многим было бы полезно.
Отредактировано k.palyanichka (Июль 23, 2015 14:54:59)
Офлайн
24
from multiprocessing.dummy import Pool import time counter = 0 limit = 5 cycle_start = time.time() cycle_end = time.time() def foo(bar): global counter global cycle_start global cycle_end if counter == limit: cycle_end = time.time() cycle_duration = cycle_end - cycle_start time.sleep(60-cycle_duration) cycle_start = time.time() counter = 0 print(bar) counter += 1 pool = Pool(2) pool.map(foo, range(50))
Отредактировано ayb (Июль 24, 2015 08:37:16)
Офлайн
221
k.palyanichka
Наскоряк накидал учебный пример. Если надо позже объясню.
import Queue, time, threading, datetime number_of_thread = 2 max_task_per_time = 2 time_period = 5 tasks = [2 * x for x in range(5)] tasks.extend(range(5)) #tasks = [1, 2, 4, 2, 3, 5, 1, 2] tasks = [1]*10 queue = Queue.Queue() for index, task in enumerate(tasks): queue.put((index,task)) def work(task): time.sleep(task) list_of_times = [] def check_time(): if len(list_of_times) >= max_task_per_time: if time.time()-list_of_times[-max_task_per_time] > time_period: del list_of_times[:-max_task_per_time-1] return True else: False else: return True def worker(): while True: if not check_time(): print('not time, wait ...') time.sleep(1) continue index, task = queue.get() list_of_times.append(time.time()) print('{:%M:%S} Index = {} task = {} get'.format(datetime.datetime.now(), index, task)) work(task) print('{:%M:%S} Index = {} task = {} done'.format(datetime.datetime.now(), index, task)) queue.task_done() for i in range(number_of_thread): thread = threading.Thread(target=worker) thread.setDaemon(True) thread.start() queue.join()
Офлайн
3
Чтото както мудрено, а так чем не подходит?
def worker(): while 1: cur_task = q.get() print cur_task import Queue import threading import time q = Queue.Queue() threading.Thread(target = worker).start() threading.Thread(target = worker).start() # два потока :) while #не настодоест: q.put(#таска) time.sleep(60/#колво в минуту) #тут еще до кучи прикрутить условия завершения потоков #(например "кодовым словом" в Queue)
Отредактировано Iskatel (Авг. 3, 2015 07:07:31)
Офлайн
253
:) В заданной постановке задачи достаточно каждый поток ограничить половинным числом выполнений.
https://docs.python.org/2/library/sched.html
Отредактировано doza_and (Авг. 3, 2015 07:52:52)
Офлайн
3
doza_andВ заданной постановке задачи достаточно каждый поток ограничить половинным числом выполнений.https://docs.python.org/2/library/sched.html
def worker(): cur_task = q.get() print cur_task import Queue import threading import time q = Queue.Queue() q.put(#все таски сразу) while not q.empty(): threading.Thread(target = worker).start() time.sleep(60/#колво в минуту)
Отредактировано Iskatel (Авг. 3, 2015 15:29:51)
Офлайн
253
IskatelКак правильно заметили есть проблемы с тредами. В свое время не хватило его функциональности и стал использовать https://pypi.python.org/pypi/APScheduler
Забавный модуль
Офлайн
221
Iskatel
В задании не сказано что ограничение равно двум. Там сказано что потоков 2. А ограничение пусть будет любое N. Вы не ту задачу решаете. Поэтому нет не проще как у вас.
Офлайн