Уведомления

Группа в Telegram: @pythonsu

#1 Май 23, 2012 10:10:32

grok
От:
Зарегистрирован: 2010-09-09
Сообщения: 74
Репутация: +  0  -
Профиль   Отправить e-mail  

psycopg2 && multiprocessing

Привет всем! Сразу к делу.. Есть задача обрабатывать данные в параллельно работающих процессах, где каждый из них устанавливает соединение с БД и последовательно выполняет транзакции. Процессы создаются при помощи multiprocessing, тип их общения - каналы.
Но вот незадача, при создании очередного процесса он обрывает соединение предыдущего. Как я понимаю обрывает их именно сам Postgres. Подскажите, как одновременно можно создать несколько подключений?
Спасибо. Часть исходных файлов ниже.
Класс для работы с БД:

class DB():
    def __init__(self):
        self.__connect = psycopg2.connect("host='%s' port='%d' dbname='%s' user='%s' password='%s'" % (host, port, db_name, db_user, db_password))
        psycopg2.extensions.register_type(psycopg2.extensions.UNICODE)
    def __del__(self):
        self.close()
    def query(self, query):
        cursor = self.__connect.cursor(cursor_factory = psycopg2.extras.RealDictCursor)
        try:
            cursor.execute(query)
        except psycopg2.ProgrammingError as error:
            print error
            return False
        try:
            a = cursor.fetchall()
            cursor.close()
            self.commit()
            return a
        except StandardError as error:
            print error
            return []
    def commit(self):
        return self.__connect.commit()
    def close(self):
        return self.__connect.close()

Таким образом создаются || процессы:
class Test():
    def __init__(self, t4db):
        self.t4db = t4db
    def out(self, input_p, sql):
        sleep(20) # сделал задержку дабы посмотреть количество соединений
        input_p.send(self.t4db.query(sql))
def consumer(pipe, len_out):
    output_p, input_p = pipe
    input_p.close()
    out = 
    while True:
        out.append(output_p.recv())
        if len(out) == len_out:
            input_p.close()
            break
    print out
def producer(queries, input_p):
    for sql in queries:
        t4db = Trimis4_DB() # создаем новый экземпляр класса DB, а значит и создаем новое соединение с БД!
        t4t = Test(t4db)
        print id(t4db), id(t4rr) # смотрим идентификаторы экземпляров и убеждаемся, что для всех 5-ти sql они разные!
        trt = multiprocessing.Process(target=t4t.out, args=(input_p, sql))
        trt.start()
queries = [sql1, sql2, sql3, sql4, sql5]
(output_p, input_p) = multiprocessing.Pipe()
cons_p = multiprocessing.Process(target=consumer, args=((output_p, input_p), len(queries)))
cons_p.start()
producer(queries, input_p)
cons_p.join()

В итоге вылетает ошибка БД следующего вида:
server closed the connection unexpectedly
This probably means the server terminated abnormally
before or while processing the request.

Что и объясняет, как ни создавай новые соединения, старое соединение обрывается, новое создается.



Отредактировано grok (Май 23, 2012 10:15:09)

Офлайн

#2 Май 23, 2012 17:32:53

Lexander
От:
Зарегистрирован: 2008-09-19
Сообщения: 1139
Репутация: +  33  -
Профиль   Отправить e-mail  

psycopg2 && multiprocessing

http://www.postgresql.org/docs/9.1/static/ecpg-connect.html
Раздел 33.2.2. Choosing a Connection



Офлайн

#3 Май 24, 2012 11:40:01

grok
От:
Зарегистрирован: 2010-09-09
Сообщения: 74
Репутация: +  0  -
Профиль   Отправить e-mail  

psycopg2 && multiprocessing

Проблема разрешилась следующим образом: не знаю почему, но один и тот же процесс не может создать больше одного соединения с БД, что выше и продемонстрировано; стоило перенести создание соединения в функцию, которая запускается параллельно, и, вуаля, создается еще одно соединение, что, собственно, и требовалось.



Офлайн

#4 Ноя. 27, 2012 22:59:05

buran
Зарегистрирован: 2012-11-27
Сообщения: 4
Репутация: +  0  -
Профиль   Отправить e-mail  

psycopg2 && multiprocessing

Была такая же проблема и тоже с multiprocessing,
решив обрадовался, но рано … возникла новая фигня:
когда пареллельно пишуться в базу новые данные
и они же нужны для дальнейшей работы позже (дёргаются из базы)
получается каша:
1-ый процесс установил соединение с базой дёрнул нужные данные,
сделал свои действия, проверил что “награбленных” новых данных нет в базе
попытался их закомитить
2-ой процесс также установил соединение с базой и тоже дёрнул данные
и среди них были данные, которые уже обрабатывал 1-ый процесс
он также прошёл по пути 1-го процесса и попытался закомитить данные
которые УЖЕ закомител или комитит 1-ый процесс

Всё это дело усугубляет большое число воркеров работающих над общей задачей одновременно.
Ясно что можно ставить какие-то локи но тогда не будет всё так быстро и эффект от параллельности
сводится к мимнимуму …

В конце конов пришёл к выводу что транзакции мне не нужны,
при большом числе апдейтов и селектов,

conn.autocommit = True
и эффект каши свёлся к минимуму

Офлайн

#5 Ноя. 28, 2012 20:42:08

doza_and
От:
Зарегистрирован: 2010-08-15
Сообщения: 4138
Репутация: +  252  -
Профиль   Отправить e-mail  

psycopg2 && multiprocessing


buran
и тоже дёрнул данные
и среди них были данные, которые уже обрабатывал 1-ый процесс

Думаю автокоммит тут не особо поможет. Он просто замажет проблему.

Есть разные высокоуровневые шаблоны многопоточного исполнения которые сводят кашу на нет. Хороший пример библиотеки с алгоритмами такого вида:
http://threadingbuildingblocks.org/

В базовом питоне тоже есть реализация одного из алгоритмов такого вида - parallelmap
http://docs.python.org/2/library/multiprocessing.html
см
pool.map



Офлайн

#6 Ноя. 29, 2012 00:15:37

buran
Зарегистрирован: 2012-11-27
Сообщения: 4
Репутация: +  0  -
Профиль   Отправить e-mail  

psycopg2 && multiprocessing

Думаю автокоммит тут не особо поможет. Он просто замажет проблему.

Помог и здорово, зачем юзать транзакзии если не используешь ролбэки и прочее что они предоставляют?
Просто потому что это круто?

Пул мап и юзается, doza_and, ты не вник в суть проблемы,
проблема в том Postgresql для каджой сессии (транзакции) предоставляет
“снимок” данных базы и пока 1-ый процесс что-то делает, данные в базе уже поменялись
другими процессами, 1-ый процесс их не увидит никак,
в этом была проблема …

Использование автокоммита не замазывает проблему оно
её решает до приемлемого уровня: апдейты происходят быстрее в разы и
несогласованость данных базы для N одновременно работающий процессов
практически сводится на нет.

Офлайн

#7 Ноя. 29, 2012 04:46:16

PooH
От:
Зарегистрирован: 2006-12-05
Сообщения: 1948
Репутация: +  72  -
Профиль   Отправить e-mail  

psycopg2 && multiprocessing

buran
проблема в том Postgresql для каджой сессии (транзакции) предоставляет
“снимок” данных базы и пока 1-ый процесс что-то делает, данные в базе уже поменялись
другими процессами, 1-ый процесс их не увидит никак
Вообще то для транзакции есть разные уровни изоляции Transaction Isolation



Вот здесь один из первых отарков съел лаборанта. Это был такой умный отарк, что понимал даже теорию относительности. Он разговаривал с лаборантом, а потом бросился на него и загрыз…

Офлайн

#8 Ноя. 29, 2012 19:24:46

buran
Зарегистрирован: 2012-11-27
Сообщения: 4
Репутация: +  0  -
Профиль   Отправить e-mail  

psycopg2 && multiprocessing

Вообще то для транзакции есть разные уровни изоляции Transaction Isolation

Я знаю, а вы читали чем они отличаются ?
Как это может помочь в моём случае?

Serializable я не хочу использовать,
он будет сводить на нет параллельность.

У меня такое ощущение, что Вы пишите лишь бы написать,
моё же стремление было намекнуть автору чтобы он проверил
всё ли ок с БД и тем как стало работать его приложение с ней, не спешил рано радоваться,
т.к. многопросессность/многопоточность коварна …

Офлайн

#9 Ноя. 29, 2012 21:03:22

Lexander
От:
Зарегистрирован: 2008-09-19
Сообщения: 1139
Репутация: +  33  -
Профиль   Отправить e-mail  

psycopg2 && multiprocessing

Если вы о посте grok, то он решил вопрос, используя именованные соединения, хотя и неявно.
Вынес коннект в отдельную функцию и при ее вызове каждый раз создавалось новое соединение с новым именем (имя), через которое и работал дальше.
Т.к. текста запросов нет, возможно, что у него тоже проблема с грязными данными.

Если о вашем деле, то налицо неправильно выбранный алгоритм.
Это классическая задача с одной очередью и несколькими воркерами. Погуглите решение.
Вы сделали неправильно.
У вас просто транзакции короче стали. Но исходной проблемы вы не убрали.
Как вы правильно заметили, вы замазали проблему.
Для наколенной программки пойдет, а если это где-то в реальной системе, то проблемы все еще впереди.



Офлайн

#10 Ноя. 29, 2012 23:11:56

buran
Зарегистрирован: 2012-11-27
Сообщения: 4
Репутация: +  0  -
Профиль   Отправить e-mail  

psycopg2 && multiprocessing

Хех … и Вы туда же

Если о вашем деле, то налицо неправильно выбранный алгоритм.
Вы сделали неправильно.
У вас просто транзакции короче стали. Но исходной проблемы вы не убрали.

Вы прямо-таки специалист-ясновидащий,
ещё не видели больного а сразу поставили диагноз!

У меня создаются соединения внутри воркера и очереди и разделяемые массивы данных и евенты - всё используется как надо, проблему я убрал,
я не просил помощи, всё работает должным образом,
вы же спешите “насоветовать” не надо так делать.

Моя задача интересна и довольна сложна не из-а таких мелочей,
поверьте, разрешите откланяться, а то в этой теме писать == тратить время

Офлайн

Board footer

Модераторировать

Powered by DjangoBB

Lo-Fi Version