Форум сайта python.su
0
Добрый день.
Начал изучать Python, наконец добрался до потоков.
При выполнении практической задачи возникла сложность с которой не могу справиться самостоятельно.
Задача следующая:
есть папка- размер около 3 Гигабайт, в папке лежат файлы со статистикой, файлы могут быть от 300 Кб до 3 Мб.
у всех файлов форомат один и тот же, фактически это список из 11 элементов. В качестве разделителя используется “,” Вообщем проблем парсить нет.
Проблемы появляются с агригацией данных. Когда выполнение идёт одним потоком, то всё считается верно и выходит нужный результат, только выполнение программы длится около 30 минут, что не приемлемо.
При выполнении несколькими потоками скорость значительно увеличиввается, но встаёт проблема агрегации данных. каждый поток выводит результат обработки своего файла, а нужн общий.
Собственно вопрос, как собрать данные от всех потоков в кучу?
Офлайн
857
PDAСлишком неконретное описание задачи. Можно представить десяток реализаций, которые делают это.
каждый поток выводит результат обработки своего файла, а нужн общий.
PDAНу, там очередь должна быть, куда они добавляют данные, а отдельный поток должен читать из неё.
Собственно вопрос, как собрать данные от всех потоков в кучу?
Офлайн
0
py.user.nextНапишу как я реализовал:
Слишком неконретное описание задачи. Можно представить десяток реализаций, которые делают это.
PDA
import os import re import datetime import threading import queue import time import sys Q = queue.Queue(maxsize=2500) def finddir(): now_date = datetime.date.today() my_date = datetime.datetime.strftime(now_date, "%Y%m%d") print(my_date) main_dir= "folder" goal_dir= main_dir+str(my_date+"/") os.chdir(goal_dir) print(os.getcwd()) dir_list = os.listdir(os.getcwd()) new_list = [] for item in dir_list: if re.findall("(^xxx.*)", item): a = re.findall("(^xxx.*)", item)[0] new_list.append(a) return new_list def parse(file_name): cdr = open(file_name, "r") data_list = [0,0] for sms in cdr.readlines(): sms = sms.split(",") if len(sms)== 11: # mt counting if sms[2]!=0: data_list[0]=data_list[0]+1 # mo counting if sms[2]=='0': data_list[1] =data_list[1]+1 cdr.close() return data_list def put(new_list): q_list=[] for item in new_list: q_list.append(item) Q.put(item) def worker(): mt = 0 mo = 0 in_list= [0,0] out_list=[0,0] data_list = {'mt':"", 'mo':""} in_list= [0,0] global item while True: if Q.empty(): sys.exit() item = Q.get_nowait() in_list=parse(item) mt = mt+in_list[0] mo = mo+in_list[1 print(mt) data_list['mt']=mt data_list['mo']=mo Q.task_done() def Main(): put(finddir()) for x in range(0,10): print("Thread t",x," have been started") t= threading.Thread(target=worker) t.start() Main()
Отредактировано PDA (Дек. 7, 2015 10:09:27)
Офлайн
857
Используй теги кода, они для того и придуманы, чтобы код точно передавать.
Офлайн
0
py.user.nextЯ к сожалению не знаю этих тегов. Если можно пришли их и я оформлю, как полагается
Используй теги кода, они для того и придуманы, чтобы код точно передавать.
Офлайн
568
PDA
попробуйте потыкать разные кнопочки из меню формы редактирования сообщения
Офлайн
0
FishHookСпасибо, научился.
попробуйте потыкать разные кнопочки из меню формы редактирования сообщения
Офлайн
857
PDAВот сначала напиши так, чтобы пути к файлам одним потоком складывались в одну очередь. В очереди они должны идти раздельно.
Напишу как я реализовал:
Офлайн
0
py.user.next
Вот сначала напиши так, чтобы пути к файлам одним потоком складывались в одну очередь. В очереди они должны идти раздельно.
В это время другой поток берёт путь из этой очереди и передаёт его в обработчик пути, который складывает свои результаты в другую очередь.
Так у тебя будут одновременно работать несколько обработчиков, складывая свои результаты в общую очередь результатов.
При этом вторая очередь уже должна читаться потоком, который суммирует результаты из неё и выводит на экран или в файл.
import os import re import datetime import threading import queue import time import sys Q = queue.Queue(maxsize=2500) def finddir(): now_date = datetime.date.today() my_date = datetime.datetime.strftime(now_date, "%Y%m%d") print(my_date) main_dir= "folder" goal_dir= main_dir+str(my_date+"/") os.chdir(goal_dir) print(os.getcwd()) dir_list = os.listdir(os.getcwd()) new_list = [] for item in dir_list: if re.findall("(^xxx.*)", item): a = re.findall("(^xxx.*)", item)[0] new_list.append(a) return new_list def parse(file_name): cdr = open(file_name, "r") data_list = [0,0] for sms in cdr.readlines(): sms = sms.split(",") if len(sms)== 11: # mt counting if sms[2]!=0: data_list[0]=data_list[0]+1 # mo counting if sms[2]=='0': data_list[1] =data_list[1]+1 cdr.close() return data_list def put(new_list): q_list=[] for item in new_list: q_list.append(item) Q.put(item) def worker(): mt = 0 mo = 0 in_list= [0,0] out_list=[0,0] data_list = {'mt':"", 'mo':""} in_list= [0,0] global item while True: if Q.empty(): sys.exit() item = Q.get_nowait() in_list=parse(item) mt = mt+in_list[0] mo = mo+in_list[1 print(mt) data_list['mt']=mt data_list['mo']=mo Q.task_done() def Main(): putThread= threading.Thread(target=put,args=finddir) putThread.start() for x in range(0,10): print("Thread t",x," have been started") t= threading.Thread(target=worker) t.start() Main()
Отредактировано PDA (Дек. 7, 2015 11:55:31)
Офлайн
0
py.user.next
Вот сначала напиши так, чтобы пути к файлам одним потоком складывались в одну очередь. В очереди они должны идти раздельно.
В это время другой поток берёт путь из этой очереди и передаёт его в обработчик пути, который складывает свои результаты в другую очередь.
Так у тебя будут одновременно работать несколько обработчиков, складывая свои результаты в общую очередь результатов.
При этом вторая очередь уже должна читаться потоком, который суммирует результаты из неё и выводит на экран или в файл.
Офлайн