Форум сайта python.su
Привет всем! Сразу к делу.. Есть задача обрабатывать данные в параллельно работающих процессах, где каждый из них устанавливает соединение с БД и последовательно выполняет транзакции. Процессы создаются при помощи multiprocessing, тип их общения - каналы.
Но вот незадача, при создании очередного процесса он обрывает соединение предыдущего. Как я понимаю обрывает их именно сам Postgres. Подскажите, как одновременно можно создать несколько подключений?
Спасибо. Часть исходных файлов ниже.
Класс для работы с БД:
class DB(): def __init__(self): self.__connect = psycopg2.connect("host='%s' port='%d' dbname='%s' user='%s' password='%s'" % (host, port, db_name, db_user, db_password)) psycopg2.extensions.register_type(psycopg2.extensions.UNICODE) def __del__(self): self.close() def query(self, query): cursor = self.__connect.cursor(cursor_factory = psycopg2.extras.RealDictCursor) try: cursor.execute(query) except psycopg2.ProgrammingError as error: print error return False try: a = cursor.fetchall() cursor.close() self.commit() return a except StandardError as error: print error return [] def commit(self): return self.__connect.commit() def close(self): return self.__connect.close()
class Test(): def __init__(self, t4db): self.t4db = t4db def out(self, input_p, sql): sleep(20) # сделал задержку дабы посмотреть количество соединений input_p.send(self.t4db.query(sql)) def consumer(pipe, len_out): output_p, input_p = pipe input_p.close() out = while True: out.append(output_p.recv()) if len(out) == len_out: input_p.close() break print out def producer(queries, input_p): for sql in queries: t4db = Trimis4_DB() # создаем новый экземпляр класса DB, а значит и создаем новое соединение с БД! t4t = Test(t4db) print id(t4db), id(t4rr) # смотрим идентификаторы экземпляров и убеждаемся, что для всех 5-ти sql они разные! trt = multiprocessing.Process(target=t4t.out, args=(input_p, sql)) trt.start() queries = [sql1, sql2, sql3, sql4, sql5] (output_p, input_p) = multiprocessing.Pipe() cons_p = multiprocessing.Process(target=consumer, args=((output_p, input_p), len(queries))) cons_p.start() producer(queries, input_p) cons_p.join()
Отредактировано grok (Май 23, 2012 10:15:09)
Офлайн
http://www.postgresql.org/docs/9.1/static/ecpg-connect.html
Раздел 33.2.2. Choosing a Connection
Офлайн
Проблема разрешилась следующим образом: не знаю почему, но один и тот же процесс не может создать больше одного соединения с БД, что выше и продемонстрировано; стоило перенести создание соединения в функцию, которая запускается параллельно, и, вуаля, создается еще одно соединение, что, собственно, и требовалось.
Офлайн
Была такая же проблема и тоже с multiprocessing,
решив обрадовался, но рано … возникла новая фигня:
когда пареллельно пишуться в базу новые данные
и они же нужны для дальнейшей работы позже (дёргаются из базы)
получается каша:
1-ый процесс установил соединение с базой дёрнул нужные данные,
сделал свои действия, проверил что “награбленных” новых данных нет в базе
попытался их закомитить
2-ой процесс также установил соединение с базой и тоже дёрнул данные
и среди них были данные, которые уже обрабатывал 1-ый процесс
он также прошёл по пути 1-го процесса и попытался закомитить данные
которые УЖЕ закомител или комитит 1-ый процесс
Всё это дело усугубляет большое число воркеров работающих над общей задачей одновременно.
Ясно что можно ставить какие-то локи но тогда не будет всё так быстро и эффект от параллельности
сводится к мимнимуму …
В конце конов пришёл к выводу что транзакции мне не нужны,
при большом числе апдейтов и селектов,
conn.autocommit = True
Офлайн
buran
и тоже дёрнул данные
и среди них были данные, которые уже обрабатывал 1-ый процесс
Офлайн
Думаю автокоммит тут не особо поможет. Он просто замажет проблему.
Офлайн
buranВообще то для транзакции есть разные уровни изоляции Transaction Isolation
проблема в том Postgresql для каджой сессии (транзакции) предоставляет
“снимок” данных базы и пока 1-ый процесс что-то делает, данные в базе уже поменялись
другими процессами, 1-ый процесс их не увидит никак
Офлайн
Вообще то для транзакции есть разные уровни изоляции Transaction Isolation
Офлайн
Если вы о посте grok, то он решил вопрос, используя именованные соединения, хотя и неявно.
Вынес коннект в отдельную функцию и при ее вызове каждый раз создавалось новое соединение с новым именем (имя), через которое и работал дальше.
Т.к. текста запросов нет, возможно, что у него тоже проблема с грязными данными.
Если о вашем деле, то налицо неправильно выбранный алгоритм.
Это классическая задача с одной очередью и несколькими воркерами. Погуглите решение.
Вы сделали неправильно.
У вас просто транзакции короче стали. Но исходной проблемы вы не убрали.
Как вы правильно заметили, вы замазали проблему.
Для наколенной программки пойдет, а если это где-то в реальной системе, то проблемы все еще впереди.
Офлайн
Хех … и Вы туда же
Если о вашем деле, то налицо неправильно выбранный алгоритм.
Вы сделали неправильно.
У вас просто транзакции короче стали. Но исходной проблемы вы не убрали.
Офлайн