Уведомления

Группа в Telegram: @pythonsu

#1 Май 11, 2013 17:15:17

kosolapovdg
Зарегистрирован: 2013-05-11
Сообщения: 8
Репутация: +  0  -
Профиль   Отправить e-mail  

Некорректно работает sqlalchemy, если работает несколько потоков.

Добрый вечер. Раньше скрипт работал в один поток. Появился компьютер, с несколькими ядрами процессора. Для увеличения быстродействия сделал распараллеливание расчетов. Появились проблемы с записью в базу данных:

python qwe.py 
/home/dmitry/web/analytics/local/lib/python2.7/site-packages/sqlalchemy/orm/session.py:1391: SAWarning: Usage of the 'Session.add()' operation is not currently supported within the execution stage of the flush process. Results may not be consistent. Consider using alternative event listeners or connection-level operations instead.
self._flush_warning("Session.add()")
Exception in thread Thread-2:
Traceback (most recent call last):
File "/usr/lib/python2.7/threading.py", line 552, in __bootstrap_inner
self.run()
File "/usr/lib/python2.7/threading.py", line 505, in run
self.__target(*self.__args, **self.__kwargs)
File "qwe.py", line 91, in thread_call_manager
call_manager(manager)
File "qwe.py", line 80, in call_manager
session.commit()
File "/home/dmitry/web/analytics/local/lib/python2.7/site-packages/sqlalchemy/orm/session.py", line 720, in commit
self.transaction.commit()
File "/home/dmitry/web/analytics/local/lib/python2.7/site-packages/sqlalchemy/orm/session.py", line 353, in commit
self._prepare_impl()
File "/home/dmitry/web/analytics/local/lib/python2.7/site-packages/sqlalchemy/orm/session.py", line 333, in _prepare_impl
self.session.flush()
File "/home/dmitry/web/analytics/local/lib/python2.7/site-packages/sqlalchemy/orm/session.py", line 1817, in flush
self._flush(objects)
File "/home/dmitry/web/analytics/local/lib/python2.7/site-packages/sqlalchemy/orm/session.py", line 1895, in _flush
subtransactions=True)
File "/home/dmitry/web/analytics/local/lib/python2.7/site-packages/sqlalchemy/orm/session.py", line 644, in begin
nested=nested)
File "/home/dmitry/web/analytics/local/lib/python2.7/site-packages/sqlalchemy/orm/session.py", line 214, in _begin
self._assert_active()
File "/home/dmitry/web/analytics/local/lib/python2.7/site-packages/sqlalchemy/orm/session.py", line 181, in _assert_active
"This session is in 'prepared' state; no further "
InvalidRequestError: This session is in 'prepared' state; no further SQL can be emitted within this transaction.

Exception in thread Thread-3:
Traceback (most recent call last):
File "/usr/lib/python2.7/threading.py", line 552, in __bootstrap_inner
self.run()
File "/usr/lib/python2.7/threading.py", line 505, in run
self.__target(*self.__args, **self.__kwargs)
File "qwe.py", line 91, in thread_call_manager
call_manager(manager)
File "qwe.py", line 80, in call_manager
session.commit()
File "/home/dmitry/web/analytics/local/lib/python2.7/site-packages/sqlalchemy/orm/session.py", line 720, in commit
self.transaction.commit()
File "/home/dmitry/web/analytics/local/lib/python2.7/site-packages/sqlalchemy/orm/session.py", line 353, in commit
self._prepare_impl()
File "/home/dmitry/web/analytics/local/lib/python2.7/site-packages/sqlalchemy/orm/session.py", line 333, in _prepare_impl
self.session.flush()
File "/home/dmitry/web/analytics/local/lib/python2.7/site-packages/sqlalchemy/orm/session.py", line 1817, in flush
self._flush(objects)
File "/home/dmitry/web/analytics/local/lib/python2.7/site-packages/sqlalchemy/orm/session.py", line 1935, in _flush
transaction.rollback(_capture_exception=True)
File "/home/dmitry/web/analytics/local/lib/python2.7/site-packages/sqlalchemy/util/langhelpers.py", line 61, in __exit__
compat.reraise(type_, value, traceback)
File "/home/dmitry/web/analytics/local/lib/python2.7/site-packages/sqlalchemy/orm/session.py", line 1935, in _flush
transaction.rollback(_capture_exception=True)
File "/home/dmitry/web/analytics/local/lib/python2.7/site-packages/sqlalchemy/orm/session.py", line 369, in rollback
self._assert_active(prepared_ok=True, rollback_ok=True)
File "/home/dmitry/web/analytics/local/lib/python2.7/site-packages/sqlalchemy/orm/session.py", line 202, in _assert_active
raise sa_exc.ResourceClosedError(closed_msg)
ResourceClosedError: This transaction is closed

Exception in thread Thread-1:
Traceback (most recent call last):
File "/usr/lib/python2.7/threading.py", line 552, in __bootstrap_inner
self.run()
File "/usr/lib/python2.7/threading.py", line 505, in run
self.__target(*self.__args, **self.__kwargs)
File "qwe.py", line 91, in thread_call_manager
call_manager(manager)
File "qwe.py", line 80, in call_manager
session.commit()
File "/home/dmitry/web/analytics/local/lib/python2.7/site-packages/sqlalchemy/orm/session.py", line 720, in commit
self.transaction.commit()
File "/home/dmitry/web/analytics/local/lib/python2.7/site-packages/sqlalchemy/orm/session.py", line 365, in commit
self.close()
File "/home/dmitry/web/analytics/local/lib/python2.7/site-packages/sqlalchemy/orm/session.py", line 420, in close
connection.close()
File "/home/dmitry/web/analytics/local/lib/python2.7/site-packages/sqlalchemy/engine/base.py", line 585, in close
del self.__connection
AttributeError: _Connection__connection

Exception in thread Thread-4:
Traceback (most recent call last):
File "/usr/lib/python2.7/threading.py", line 552, in __bootstrap_inner
self.run()
File "/usr/lib/python2.7/threading.py", line 505, in run
self.__target(*self.__args, **self.__kwargs)
File "qwe.py", line 91, in thread_call_manager
call_manager(manager)
File "qwe.py", line 80, in call_manager
session.commit()
File "/home/dmitry/web/analytics/local/lib/python2.7/site-packages/sqlalchemy/orm/session.py", line 720, in commit
self.transaction.commit()
File "/home/dmitry/web/analytics/local/lib/python2.7/site-packages/sqlalchemy/orm/session.py", line 365, in commit
self.close()
File "/home/dmitry/web/analytics/local/lib/python2.7/site-packages/sqlalchemy/orm/session.py", line 430, in close
self.session.begin()
File "/home/dmitry/web/analytics/local/lib/python2.7/site-packages/sqlalchemy/orm/session.py", line 651, in begin
self, nested=nested)
File "/home/dmitry/web/analytics/local/lib/python2.7/site-packages/sqlalchemy/orm/session.py", line 161, in __init__
self._take_snapshot()
File "/home/dmitry/web/analytics/local/lib/python2.7/site-packages/sqlalchemy/orm/session.py", line 237, in _take_snapshot
self.session.flush()
File "/home/dmitry/web/analytics/local/lib/python2.7/site-packages/sqlalchemy/orm/session.py", line 1817, in flush
self._flush(objects)
File "/home/dmitry/web/analytics/local/lib/python2.7/site-packages/sqlalchemy/orm/session.py", line 1935, in _flush
transaction.rollback(_capture_exception=True)
File "/home/dmitry/web/analytics/local/lib/python2.7/site-packages/sqlalchemy/util/langhelpers.py", line 58, in __exit__
compat.reraise(exc_type, exc_value, exc_tb)
File "/home/dmitry/web/analytics/local/lib/python2.7/site-packages/sqlalchemy/orm/session.py", line 1899, in _flush
flush_context.execute()
File "/home/dmitry/web/analytics/local/lib/python2.7/site-packages/sqlalchemy/orm/unitofwork.py", line 372, in execute
rec.execute(self)
File "/home/dmitry/web/analytics/local/lib/python2.7/site-packages/sqlalchemy/orm/unitofwork.py", line 525, in execute
uow
File "/home/dmitry/web/analytics/local/lib/python2.7/site-packages/sqlalchemy/orm/persistence.py", line 63, in save_obj
table, insert)
File "/home/dmitry/web/analytics/local/lib/python2.7/site-packages/sqlalchemy/orm/persistence.py", line 537, in _emit_insert_statements
execute(statement, multiparams)
File "/home/dmitry/web/analytics/local/lib/python2.7/site-packages/sqlalchemy/engine/base.py", line 662, in execute
params)
File "/home/dmitry/web/analytics/local/lib/python2.7/site-packages/sqlalchemy/engine/base.py", line 761, in _execute_clauseelement
compiled_sql, distilled_params
File "/home/dmitry/web/analytics/local/lib/python2.7/site-packages/sqlalchemy/engine/base.py", line 874, in _execute_context
context)
File "/home/dmitry/web/analytics/local/lib/python2.7/site-packages/sqlalchemy/engine/base.py", line 1024, in _handle_dbapi_exception
exc_info
File "/home/dmitry/web/analytics/local/lib/python2.7/site-packages/sqlalchemy/util/compat.py", line 163, in raise_from_cause
reraise(type(exception), exception, tb=exc_tb)
File "/home/dmitry/web/analytics/local/lib/python2.7/site-packages/sqlalchemy/engine/base.py", line 867, in _execute_context
context)
File "/home/dmitry/web/analytics/local/lib/python2.7/site-packages/sqlalchemy/engine/default.py", line 324, in do_execute
cursor.execute(statement, parameters)
IntegrityError: (IntegrityError) duplicate key value violates unique constraint "manager_log_pkey"
DETAIL: Key (id)=(1162) already exists.
'INSERT INTO manager_log (id, direction, price, count, gain, date) VALUES (%(id)s, %(direction)s, %(price)s, %(count)s, %(gain)s, %(date)s)' {'count': 3, 'direction': 1, 'price': 10, 'gain': 0.1, 'date': datetime.datetime(2013, 5, 11, 20, 55, 16, 894727), 'id': 1162}

^CTraceback (most recent call last):
File "qwe.py", line 121, in <module>
execute_managers()
File "qwe.py", line 107, in execute_managers
queue_manager.put(manager)
File "/usr/lib/python2.7/Queue.py", line 137, in put
self.unfinished_tasks += 1
KeyboardInterrupt

В теле сообщения прикрепил файл с исходным кодом. Сам код – это воссоздание тех ошибок, которые я наблюдаю при выполнении моего скрипта.
Вопрос такой: как исправить ошибки?

Прикреплённый файлы:
attachment qwe.py (3,4 KБ)

Офлайн

#2 Май 12, 2013 09:32:31

plusplus
От:
Зарегистрирован: 2009-01-05
Сообщения: 418
Репутация: +  15  -
Профиль   Отправить e-mail  

Некорректно работает sqlalchemy, если работает несколько потоков.

kosolapovdg
Для увеличения быстродействия сделал распараллеливание расчетов.

В данном случае потоки не помогут, прочитай про GIL в Python.

Для многопоточности в sqlalchemy нужно использовать scoped_session, вот тут пример у меня был http://python4seo.net/?p=255.



Офлайн

#3 Май 12, 2013 10:30:11

kosolapovdg
Зарегистрирован: 2013-05-11
Сообщения: 8
Репутация: +  0  -
Профиль   Отправить e-mail  

Некорректно работает sqlalchemy, если работает несколько потоков.

plusplus
В данном случае потоки не помогут, прочитай про GIL в Python.
Для многопоточности в sqlalchemy нужно использовать scoped_session, вот тут пример у меня был http://python4seo.net/?p=255.
Ни как способ не помог. А вообще, нужно для каждого потока создавать свою сессию? Чтобы GIL не мешал.

Отредактировано kosolapovdg (Май 12, 2013 10:31:29)

Офлайн

#4 Май 12, 2013 22:33:27

plusplus
От:
Зарегистрирован: 2009-01-05
Сообщения: 418
Репутация: +  15  -
Профиль   Отправить e-mail  

Некорректно работает sqlalchemy, если работает несколько потоков.

kosolapovdg
Ни как способ не помог. А вообще, нужно для каждого потока создавать свою сессию? Чтобы GIL не мешал.

Надо почитать про GIL и понять в каких случаях потоки дадут пользу, а в каких нет. Потоки нужно использовать при каких-нибудь сетевых запросах, например, если же нужно за счет ядер увеличить быстродействие, то нужно смотреть в сторону многопроцессовости.

Вообще с SQL-сервером работать желательно в один коннект и один поток. Создание нескольких коннектов/сессий нагибает sql-сервер, а работа в нескольких потоках с базой не дает ничего.



Офлайн

#5 Май 13, 2013 10:19:10

kosolapovdg
Зарегистрирован: 2013-05-11
Сообщения: 8
Репутация: +  0  -
Профиль   Отправить e-mail  

Некорректно работает sqlalchemy, если работает несколько потоков.

plusplus
Вообще с SQL-сервером работать желательно в один коннект и один поток. Создание нескольких коннектов/сессий нагибает sql-сервер, а работа в нескольких потоках с базой не дает ничего.
Сейчас сделал две версии: с процессами и с потоками. Скрипт, который работает с потоками, выполняется за 5512 секунд, а скрипт, который работает с процессами выполняется за 2821 секунду. То есть производительность в два раза выше.

Офлайн

Board footer

Модераторировать

Powered by DjangoBB

Lo-Fi Version