Найти - Пользователи
Полная версия: psycopg2 && multiprocessing
Начало » Базы данных » psycopg2 && multiprocessing
1
grok
Привет всем! Сразу к делу.. Есть задача обрабатывать данные в параллельно работающих процессах, где каждый из них устанавливает соединение с БД и последовательно выполняет транзакции. Процессы создаются при помощи multiprocessing, тип их общения - каналы.
Но вот незадача, при создании очередного процесса он обрывает соединение предыдущего. Как я понимаю обрывает их именно сам Postgres. Подскажите, как одновременно можно создать несколько подключений?
Спасибо. Часть исходных файлов ниже.
Класс для работы с БД:
class DB():
    def __init__(self):
        self.__connect = psycopg2.connect("host='%s' port='%d' dbname='%s' user='%s' password='%s'" % (host, port, db_name, db_user, db_password))
        psycopg2.extensions.register_type(psycopg2.extensions.UNICODE)
    def __del__(self):
        self.close()
    def query(self, query):
        cursor = self.__connect.cursor(cursor_factory = psycopg2.extras.RealDictCursor)
        try:
            cursor.execute(query)
        except psycopg2.ProgrammingError as error:
            print error
            return False
        try:
            a = cursor.fetchall()
            cursor.close()
            self.commit()
            return a
        except StandardError as error:
            print error
            return []
    def commit(self):
        return self.__connect.commit()
    def close(self):
        return self.__connect.close()

Таким образом создаются || процессы:
class Test():
    def __init__(self, t4db):
        self.t4db = t4db
    def out(self, input_p, sql):
        sleep(20) # сделал задержку дабы посмотреть количество соединений
        input_p.send(self.t4db.query(sql))
def consumer(pipe, len_out):
    output_p, input_p = pipe
    input_p.close()
    out = 
    while True:
        out.append(output_p.recv())
        if len(out) == len_out:
            input_p.close()
            break
    print out
def producer(queries, input_p):
    for sql in queries:
        t4db = Trimis4_DB() # создаем новый экземпляр класса DB, а значит и создаем новое соединение с БД!
        t4t = Test(t4db)
        print id(t4db), id(t4rr) # смотрим идентификаторы экземпляров и убеждаемся, что для всех 5-ти sql они разные!
        trt = multiprocessing.Process(target=t4t.out, args=(input_p, sql))
        trt.start()
queries = [sql1, sql2, sql3, sql4, sql5]
(output_p, input_p) = multiprocessing.Pipe()
cons_p = multiprocessing.Process(target=consumer, args=((output_p, input_p), len(queries)))
cons_p.start()
producer(queries, input_p)
cons_p.join()

В итоге вылетает ошибка БД следующего вида:
server closed the connection unexpectedly
This probably means the server terminated abnormally
before or while processing the request.

Что и объясняет, как ни создавай новые соединения, старое соединение обрывается, новое создается.
Lexander
http://www.postgresql.org/docs/9.1/static/ecpg-connect.html
Раздел 33.2.2. Choosing a Connection
grok
Проблема разрешилась следующим образом: не знаю почему, но один и тот же процесс не может создать больше одного соединения с БД, что выше и продемонстрировано; стоило перенести создание соединения в функцию, которая запускается параллельно, и, вуаля, создается еще одно соединение, что, собственно, и требовалось.
buran
Была такая же проблема и тоже с multiprocessing,
решив обрадовался, но рано … возникла новая фигня:
когда пареллельно пишуться в базу новые данные
и они же нужны для дальнейшей работы позже (дёргаются из базы)
получается каша:
1-ый процесс установил соединение с базой дёрнул нужные данные,
сделал свои действия, проверил что “награбленных” новых данных нет в базе
попытался их закомитить
2-ой процесс также установил соединение с базой и тоже дёрнул данные
и среди них были данные, которые уже обрабатывал 1-ый процесс
он также прошёл по пути 1-го процесса и попытался закомитить данные
которые УЖЕ закомител или комитит 1-ый процесс

Всё это дело усугубляет большое число воркеров работающих над общей задачей одновременно.
Ясно что можно ставить какие-то локи но тогда не будет всё так быстро и эффект от параллельности
сводится к мимнимуму …

В конце конов пришёл к выводу что транзакции мне не нужны,
при большом числе апдейтов и селектов,
conn.autocommit = True
и эффект каши свёлся к минимуму
doza_and

buran
и тоже дёрнул данные
и среди них были данные, которые уже обрабатывал 1-ый процесс

Думаю автокоммит тут не особо поможет. Он просто замажет проблему.

Есть разные высокоуровневые шаблоны многопоточного исполнения которые сводят кашу на нет. Хороший пример библиотеки с алгоритмами такого вида:
http://threadingbuildingblocks.org/

В базовом питоне тоже есть реализация одного из алгоритмов такого вида - parallelmap
http://docs.python.org/2/library/multiprocessing.html
см
pool.map
buran
Думаю автокоммит тут не особо поможет. Он просто замажет проблему.

Помог и здорово, зачем юзать транзакзии если не используешь ролбэки и прочее что они предоставляют?
Просто потому что это круто?

Пул мап и юзается, doza_and, ты не вник в суть проблемы,
проблема в том Postgresql для каджой сессии (транзакции) предоставляет
“снимок” данных базы и пока 1-ый процесс что-то делает, данные в базе уже поменялись
другими процессами, 1-ый процесс их не увидит никак,
в этом была проблема …

Использование автокоммита не замазывает проблему оно
её решает до приемлемого уровня: апдейты происходят быстрее в разы и
несогласованость данных базы для N одновременно работающий процессов
практически сводится на нет.
PooH
buran
проблема в том Postgresql для каджой сессии (транзакции) предоставляет
“снимок” данных базы и пока 1-ый процесс что-то делает, данные в базе уже поменялись
другими процессами, 1-ый процесс их не увидит никак
Вообще то для транзакции есть разные уровни изоляции Transaction Isolation
buran
Вообще то для транзакции есть разные уровни изоляции Transaction Isolation

Я знаю, а вы читали чем они отличаются ?
Как это может помочь в моём случае?

Serializable я не хочу использовать,
он будет сводить на нет параллельность.

У меня такое ощущение, что Вы пишите лишь бы написать,
моё же стремление было намекнуть автору чтобы он проверил
всё ли ок с БД и тем как стало работать его приложение с ней, не спешил рано радоваться,
т.к. многопросессность/многопоточность коварна …

Lexander
Если вы о посте grok, то он решил вопрос, используя именованные соединения, хотя и неявно.
Вынес коннект в отдельную функцию и при ее вызове каждый раз создавалось новое соединение с новым именем (имя), через которое и работал дальше.
Т.к. текста запросов нет, возможно, что у него тоже проблема с грязными данными.

Если о вашем деле, то налицо неправильно выбранный алгоритм.
Это классическая задача с одной очередью и несколькими воркерами. Погуглите решение.
Вы сделали неправильно.
У вас просто транзакции короче стали. Но исходной проблемы вы не убрали.
Как вы правильно заметили, вы замазали проблему.
Для наколенной программки пойдет, а если это где-то в реальной системе, то проблемы все еще впереди.
buran
Хех … и Вы туда же

Если о вашем деле, то налицо неправильно выбранный алгоритм.
Вы сделали неправильно.
У вас просто транзакции короче стали. Но исходной проблемы вы не убрали.

Вы прямо-таки специалист-ясновидащий,
ещё не видели больного а сразу поставили диагноз!

У меня создаются соединения внутри воркера и очереди и разделяемые массивы данных и евенты - всё используется как надо, проблему я убрал,
я не просил помощи, всё работает должным образом,
вы же спешите “насоветовать” не надо так делать.

Моя задача интересна и довольна сложна не из-а таких мелочей,
поверьте, разрешите откланяться, а то в этой теме писать == тратить время

This is a "lo-fi" version of our main content. To view the full version with more information, formatting and images, please click here.
Powered by DjangoBB