Уведомления

Группа в Telegram: @pythonsu

#1 Дек. 7, 2015 09:03:08

PDA
Зарегистрирован: 2015-12-07
Сообщения: 10
Репутация: +  0  -
Профиль   Отправить e-mail  

Сбор результатов выполнения потоков в одном месте

Добрый день.
Начал изучать Python, наконец добрался до потоков.
При выполнении практической задачи возникла сложность с которой не могу справиться самостоятельно.
Задача следующая:
есть папка- размер около 3 Гигабайт, в папке лежат файлы со статистикой, файлы могут быть от 300 Кб до 3 Мб.
у всех файлов форомат один и тот же, фактически это список из 11 элементов. В качестве разделителя используется “,” Вообщем проблем парсить нет.
Проблемы появляются с агригацией данных. Когда выполнение идёт одним потоком, то всё считается верно и выходит нужный результат, только выполнение программы длится около 30 минут, что не приемлемо.
При выполнении несколькими потоками скорость значительно увеличиввается, но встаёт проблема агрегации данных. каждый поток выводит результат обработки своего файла, а нужн общий.
Собственно вопрос, как собрать данные от всех потоков в кучу?

Офлайн

#2 Дек. 7, 2015 09:51:24

py.user.next
От:
Зарегистрирован: 2010-04-29
Сообщения: 10016
Репутация: +  857  -
Профиль   Отправить e-mail  

Сбор результатов выполнения потоков в одном месте

PDA
каждый поток выводит результат обработки своего файла, а нужн общий.
Слишком неконретное описание задачи. Можно представить десяток реализаций, которые делают это.

PDA
Собственно вопрос, как собрать данные от всех потоков в кучу?
Ну, там очередь должна быть, куда они добавляют данные, а отдельный поток должен читать из неё.



Офлайн

#3 Дек. 7, 2015 09:59:04

PDA
Зарегистрирован: 2015-12-07
Сообщения: 10
Репутация: +  0  -
Профиль   Отправить e-mail  

Сбор результатов выполнения потоков в одном месте

py.user.next
Слишком неконретное описание задачи. Можно представить десяток реализаций, которые делают это.

PDA
Напишу как я реализовал:

import os
import re
import datetime
import threading
import queue
import time
import sys
Q = queue.Queue(maxsize=2500)
def finddir():
    now_date = datetime.date.today()
    my_date = datetime.datetime.strftime(now_date, "%Y%m%d")
    print(my_date)
    main_dir= "folder"
    goal_dir= main_dir+str(my_date+"/")
    os.chdir(goal_dir)
    print(os.getcwd())
    dir_list = os.listdir(os.getcwd())
    new_list = []
    for item in dir_list:
            if re.findall("(^xxx.*)", item):
                a = re.findall("(^xxx.*)", item)[0]
            new_list.append(a)
    return new_list
def parse(file_name):
            cdr = open(file_name, "r")
            data_list = [0,0]
            for sms in cdr.readlines():
                sms = sms.split(",")
                if len(sms)== 11:
                    # mt counting
                    if sms[2]!=0:
                        data_list[0]=data_list[0]+1
                    # mo counting
                    if sms[2]=='0':
                        data_list[1] =data_list[1]+1
                  
            cdr.close()
            return data_list
def put(new_list):
    q_list=[]
    for item in new_list:
        q_list.append(item)
        Q.put(item)
def worker():
    mt = 0
    mo = 0
    
    in_list= [0,0]
    out_list=[0,0]
    data_list = {'mt':"", 'mo':""}
    in_list= [0,0]
    global item
    while True:
        if Q.empty(): sys.exit()
        item = Q.get_nowait()
        in_list=parse(item)
        mt = mt+in_list[0]
        mo = mo+in_list[1
        print(mt)
        data_list['mt']=mt
        data_list['mo']=mo
        Q.task_done()
def Main():
    put(finddir())
    for x in range(0,10):
        print("Thread t",x," have been started")
        t= threading.Thread(target=worker)
        t.start()
Main()

Всего у меня 5 функций и очередь.
1 функция поиск необходимой папки с файлами, реализация по дате
2 собственно поиск необходимых данных в файлах
3 заполнение очереди
4 функция которая запускает парсер, и заполняет очередь
5 Главная функция где потокам выдаётся задача

Отредактировано PDA (Дек. 7, 2015 10:09:27)

Офлайн

#4 Дек. 7, 2015 10:01:29

py.user.next
От:
Зарегистрирован: 2010-04-29
Сообщения: 10016
Репутация: +  857  -
Профиль   Отправить e-mail  

Сбор результатов выполнения потоков в одном месте

Используй теги кода, они для того и придуманы, чтобы код точно передавать.



Офлайн

#5 Дек. 7, 2015 10:04:40

PDA
Зарегистрирован: 2015-12-07
Сообщения: 10
Репутация: +  0  -
Профиль   Отправить e-mail  

Сбор результатов выполнения потоков в одном месте

py.user.next
Используй теги кода, они для того и придуманы, чтобы код точно передавать.
Я к сожалению не знаю этих тегов. Если можно пришли их и я оформлю, как полагается

Офлайн

#6 Дек. 7, 2015 10:06:57

FishHook
От:
Зарегистрирован: 2011-01-08
Сообщения: 8312
Репутация: +  568  -
Профиль   Отправить e-mail  

Сбор результатов выполнения потоков в одном месте

PDA
попробуйте потыкать разные кнопочки из меню формы редактирования сообщения



Офлайн

#7 Дек. 7, 2015 10:09:57

PDA
Зарегистрирован: 2015-12-07
Сообщения: 10
Репутация: +  0  -
Профиль   Отправить e-mail  

Сбор результатов выполнения потоков в одном месте


FishHook
попробуйте потыкать разные кнопочки из меню формы редактирования сообщения
Спасибо, научился.

Офлайн

#8 Дек. 7, 2015 11:38:23

py.user.next
От:
Зарегистрирован: 2010-04-29
Сообщения: 10016
Репутация: +  857  -
Профиль   Отправить e-mail  

Сбор результатов выполнения потоков в одном месте

PDA
Напишу как я реализовал:
Вот сначала напиши так, чтобы пути к файлам одним потоком складывались в одну очередь. В очереди они должны идти раздельно.

В это время другой поток берёт путь из этой очереди и передаёт его в обработчик пути, который складывает свои результаты в другую очередь.
Так у тебя будут одновременно работать несколько обработчиков, складывая свои результаты в общую очередь результатов.

При этом вторая очередь уже должна читаться потоком, который суммирует результаты из неё и выводит на экран или в файл.



Офлайн

#9 Дек. 7, 2015 11:54:32

PDA
Зарегистрирован: 2015-12-07
Сообщения: 10
Репутация: +  0  -
Профиль   Отправить e-mail  

Сбор результатов выполнения потоков в одном месте

py.user.next
Вот сначала напиши так, чтобы пути к файлам одним потоком складывались в одну очередь. В очереди они должны идти раздельно.

В это время другой поток берёт путь из этой очереди и передаёт его в обработчик пути, который складывает свои результаты в другую очередь.
Так у тебя будут одновременно работать несколько обработчиков, складывая свои результаты в общую очередь результатов.

При этом вторая очередь уже должна читаться потоком, который суммирует результаты из неё и выводит на экран или в файл.

То есть получается функцию put засунуть в отдельный поток?, а в функции worker уже исключительно вытаскивать?
что типа этого:
import os
import re
import datetime
import threading
import queue
import time
import sys
Q = queue.Queue(maxsize=2500)
def finddir():
    now_date = datetime.date.today()
    my_date = datetime.datetime.strftime(now_date, "%Y%m%d")
    print(my_date)
    main_dir= "folder"
    goal_dir= main_dir+str(my_date+"/")
    os.chdir(goal_dir)
    print(os.getcwd())
    dir_list = os.listdir(os.getcwd())
    new_list = []
    for item in dir_list:
            if re.findall("(^xxx.*)", item):
                a = re.findall("(^xxx.*)", item)[0]
            new_list.append(a)
    return new_list
def parse(file_name):
            cdr = open(file_name, "r")
            data_list = [0,0]
            for sms in cdr.readlines():
                sms = sms.split(",")
                if len(sms)== 11:
                    # mt counting
                    if sms[2]!=0:
                        data_list[0]=data_list[0]+1
                    # mo counting
                    if sms[2]=='0':
                        data_list[1] =data_list[1]+1
                  
            cdr.close()
            return data_list
def put(new_list):
    q_list=[]
    for item in new_list:
        q_list.append(item)
        Q.put(item)
def worker():
    mt = 0
    mo = 0
    
    in_list= [0,0]
    out_list=[0,0]
    data_list = {'mt':"", 'mo':""}
    in_list= [0,0]
    global item
    while True:
        if Q.empty(): sys.exit()
        item = Q.get_nowait()
        in_list=parse(item)
        mt = mt+in_list[0]
        mo = mo+in_list[1
        print(mt)
        data_list['mt']=mt
        data_list['mo']=mo
        Q.task_done()
def Main():
    putThread= threading.Thread(target=put,args=finddir)
    putThread.start()
    for x in range(0,10):
        print("Thread t",x," have been started")
        t= threading.Thread(target=worker)
        t.start()
Main()

Отредактировано PDA (Дек. 7, 2015 11:55:31)

Офлайн

#10 Дек. 7, 2015 12:09:57

PDA
Зарегистрирован: 2015-12-07
Сообщения: 10
Репутация: +  0  -
Профиль   Отправить e-mail  

Сбор результатов выполнения потоков в одном месте

py.user.next
Вот сначала напиши так, чтобы пути к файлам одним потоком складывались в одну очередь. В очереди они должны идти раздельно.

В это время другой поток берёт путь из этой очереди и передаёт его в обработчик пути, который складывает свои результаты в другую очередь.
Так у тебя будут одновременно работать несколько обработчиков, складывая свои результаты в общую очередь результатов.

При этом вторая очередь уже должна читаться потоком, который суммирует результаты из неё и выводит на экран или в файл.


Начал пытаться реализовать и тут же возник вопрос:
Функция Put до запуска потоков уже заполнила очередь файлами, зачем её в отдельный поток выводить?
То есть на момент запуска первого потока уже есть файлы, с котороми можно работать.

Офлайн

Board footer

Модераторировать

Powered by DjangoBB

Lo-Fi Version