Уведомления

Группа в Telegram: @pythonsu

#1 Июнь 20, 2010 15:13:07

ZZZ
От: Москва
Зарегистрирован: 2008-04-03
Сообщения: 2161
Репутация: +  26  -
Профиль   Адрес электронной почты  

Сигнал прерывания с клавиатуры

Привет всем.

У меня странная проблема… Как и всегда…

import os
import sys
import signal

import config
import core

def start():
global core_main
core_main = core.Core()
core_main.start()

def stop():
print "Stoping..."
core_main.stop()

if __name__ == '__main__':
signal.signal(signal.SIGINT, lambda *a: stop())
start()
Не работает. stop не вызывается.
В core.Core.start создаётся нить и к ней делается join. На данный момент в run нити только while с бесконечным time.sleep(1.0).
Если меняю start на time.sleep(10) – т.е. не создаю нить – то всё прекрасно.

Чего я не понимаю?

P.S. Пробовал отлавливать другие сигналы – они перехватываются, но stop не вызывается.



Отредактировано (Июнь 20, 2010 15:38:40)

Офлайн

#2 Июнь 20, 2010 17:59:49

ZZZ
От: Москва
Зарегистрирован: 2008-04-03
Сообщения: 2161
Репутация: +  26  -
Профиль   Адрес электронной почты  

Сигнал прерывания с клавиатуры

Решил переопределением join в нити вот таким образом:

    def join(self):
while not self.is_alive():
time.sleep(1.0)
Какой-то костыль получился, но работает как надо.

Вообще, мне кажется, проблема в том, что join написан на Си и является атомарным для питона.



Офлайн

#3 Июль 8, 2010 15:44:58

Alex2ndr
От:
Зарегистрирован: 2009-12-26
Сообщения: 204
Репутация: +  0  -
Профиль   Отправить e-mail  

Сигнал прерывания с клавиатуры

Всем доброго времени суток!

Данная проблема получила продолжение.

Есть некий поток:

class RPCThread(threading.Thread):

flag_stop = True

def __init__(self):
super(RPCThread, self).__init__()
addr = (config.RPC_SERVER_ADDRESS, config.RPC_SERVER_PORT)
self.rpc = SimpleXMLRPCServer(addr)
self.log = logging.getLogger(self.__class__.__name__)

def stop(self):
self.log.info("Stoping")
self.flag_stop = True

def run(self):
self.log.info("Starting")
self.flag_stop = False
...
self.serve_forever()

def join(self):
while not self.isAlive():
time.sleep(1.0)

def serve_forever(self):
"""Handle requests while stop"""
while not self.flag_stop:
self.handle_request()
...
Иногда, при вызове метода stop происходит следующее: флаг self.isAlive начинает возвращать False. В результате цикл в функции join начинает бесконечно выполняться. Значение флага назад на True не меняется. Таким образом завершение потока(и программы тоже) не происходит. Приходится убивать посылая sigterm. Очевидной закономерности нет. Вроде бы если послать сигнал выключения через ~0.9 - 1.1 сек. после запуска.

У кого-нибудь есть мысли, почему/из-за чего это происходит?

Всем спасибо за внимание!



Офлайн

#4 Июль 15, 2010 15:01:25

Alex2ndr
От:
Зарегистрирован: 2009-12-26
Сообщения: 204
Репутация: +  0  -
Профиль   Отправить e-mail  

Сигнал прерывания с клавиатуры

Всем доброго времени суток!

Разобрался я в этой проблеме. Имя ей - GIL. Более подробное описание наших изысканий:
Есть некая программа, которая должна одновременно выполнять 2-е функции: Собирать некие данные и общаться с клиентами(т е передавать им данные). Т к недопустимо, чтобы клиенты ждали конца сбора данных, то логично выделать 2-е разные нити под эти процессы - тогда сбор данных и общение с клиентами будет вестись параллельно. обращаю ваше внимание на один момент - НЕЛЬЗЯ предсказать, когда каждая из этих нитей завершится. Теоретически они могут работать бесконечно. Практически же - до тех пор, пока администратор не остановит.
Первое решение которое приходит в голову - в этих потоках запускаем бесконечный цикл с проверкой флага:

flag_stop = False
while not flag_stop:
time.sleep(1.0)
потом связать flag_stop = True с нужным сигналом(например SIGINT). По сигналу флаг сменится и цикл остановится. Но не все так просто. Данная тема началась с того, что это как раз таки не работает. После того как исполняем threading.Thread.join() сигналы перестают приниматься. А происходит это потому, что в join() вызывается блокировка ( thread.allocate_lock() ). После этого потоки начинают варится в собственном соку и достучаться до них снаружи - не получится. Как я понимаю это происходит из-за GIL. После некоторого гуления нашел следующий траблтикет: Issue1167930, в котором обсуждается аналогичная проблема. Но увы - обсуждение быстро закончилось следующим сообщением Гвидо:
Guido van Rossum
This is because the regular acquire() method on a basic lock cannot be interrupted. That's unlikely to go away, so you'll just have to live with this. As you've discovered, specifying a timeout solves the issue (sort of).
А что это он говорил про timeout? Изучив документацию видим, что в join можно передать некоторую задержку. А что же она нам дает? Дальше начинаются мои собственные изыскания - опытным путем было установлено, что блокировка не наступает, до тех пор, пока не истечет задержка. Т е передали 10 - в течении 10 сек можно передать сигнал и он сработает как планировалось. Но после - увы - все по прежнему. Т к предсказать интервал, за который отработают процессы я не могу - он бесконечен, то этот путь нам не подходит.
Второе решение - переписать join без lock я пока оставляю без внимания - именно так и сделал ZZZ, но это породило другие проблемы. Видимо не зря там эта блокировка стоит. Тем более что Гвидо написал, что с этим придется жить - тоже наверно не зря…
Третье решение - раз гора не идет к Магомеду, то Магомед пойдет к горе. Да - снаружи послать сигнал нельзя, но ведь можно остановить процессы изнутри! Для этого надо сделать там проверку некого условия, которое находится снаружи. На практике это может быть так:создаем некий файлик и пишем туда 0. Процесс каждую итерацию проверяет этот ноль. В нужный нам момент меняем 0 на 1 и процесс, обнаружив эту замену прекращает выполнение. Данный принцип может быть основан не только на файлах, но и на чем-нибудь ином, например сокетах, пайпах и тд. Важен принцип - Нить завершает сама себя.
Для нашей программы я использовал следующий способ. Т к один из потоков это rpc сервер, то я просто зарегистрировал метод остановки в доступных к вызову функциях. Таким образом для остановки достаточно соединится с нужным сокетом и вызвать нужный метод - 2 строки. Не знаю насколько это грамотно. Но останавливается без проблем.

Пожелания и дополнения приветствуются.



Офлайн

#5 Июль 15, 2010 16:39:08

ZZZ
От: Москва
Зарегистрирован: 2008-04-03
Сообщения: 2161
Репутация: +  26  -
Профиль   Адрес электронной почты  

Сигнал прерывания с клавиатуры

Нить действительно должна завершать сама себя при “сигнале” снаружи. Под сигналом я имею ввиду любой флаг, функцию… Но синхронизация нитей сокетом и уж тем более файлом – отбойным молотком тараканов глушить! Оно может и работает, но уж совсем некрасиво. Да и проблема у нас немного в другом, если я правильно её вижу.
У нас нить-то (метод run) как раз завершается. Притом завершается от спокойно и красиво.По обычному флагу. А вот метод isAlive всё равно говорит, что нить работает. Т.е. проблема не в том, что нить не завершается, а в том, что threading.Thread всё равно думает, что нить работает.
Хорошо, будем думать, что это GIL. Просто после окончания работы нити, она пытается взять лок, чтобы сказать о том, что она закончила работу и ждёт его до второго пришествия.
Истина где-то рядом…



Офлайн

#6 Июль 15, 2010 21:53:24

Alex2ndr
От:
Зарегистрирован: 2009-12-26
Сообщения: 204
Репутация: +  0  -
Профиль   Отправить e-mail  

Сигнал прерывания с клавиатуры

ZZZ, ты забываешь про еще один момент. Ты написал свой join, он получится проще и без lock. А как ты думаешь, зачем вообще написали такой вот join который есть сейчас? Ведь твой вариант гораздо проще и очевиднее. Я приведу здесь содержимое этого join:

   def join(self, timeout=None):
if not self.__initialized:
raise RuntimeError("Thread.__init__() not called")
if not self.__started.is_set():
raise RuntimeError("cannot join thread before it is started")
if self is current_thread():
raise RuntimeError("cannot join current thread")

if __debug__:
if not self.__stopped:
self._note("%s.join(): waiting until thread stops", self)
self.__block.acquire()
try:
if timeout is None:
while not self.__stopped:
self.__block.wait()
if __debug__:
self._note("%s.join(): thread stopped", self)
else:
deadline = _time() + timeout
while not self.__stopped:
delay = deadline - _time()
if delay <= 0:
if __debug__:
self._note("%s.join(): timed out", self)
break
self.__block.wait(delay)
else:
if __debug__:
self._note("%s.join(): thread stopped", self)
finally:
self.__block.release()
здесь self.__block это thread.allocate_lock().
Это отличается от твоего фактически только наличием блокировки(т к self.__stopped это почти is_alive). “Это жжжж не спроста”(с). Не думаешь же ты, что те кто писал этот код тупее тебя? Мне думается что блокировка все-таки нужна и вот такое огульное переписывание join к хорошему не приведет. Кодом доказать это пока не могу. Привожу статьи, которые меня на это мнение натолкнули:
http://jessenoller.com/2009/02/01/python-threads-and-the-global-interpreter-lock/
http://keysolutions.ru/articles/osnovy-raboty-s-potokami-v-python
http://docs.python.org/c-api/init.html
Пока мое мнение такое - блокировка нужна для того, чтобы предотвратить совместный доступ потоков к какому либо объекту одновременно. Это может быть страница памяти, файл и тд. Пока наши потоки фактически ничего не делают, поэтому и проблемм мы не встретили, но дальше можем наткнуться. Т е оставлять за спиной потенциальную бомбу не хочется.
Кроме того остановка это редкое явление для сервера. Можно сказать что нештатная ситуация. Теоретически мы даже можем останавливать сервер через SIGKILL. Я считаю что корректная обработка такой редкой ситуации не стоит проблемм с потоками.
Продолжаю читать доки. Ищу фактические доказательства.



Отредактировано (Июль 15, 2010 22:06:19)

Офлайн

#7 Июль 17, 2010 13:53:24

ZZZ
От: Москва
Зарегистрирован: 2008-04-03
Сообщения: 2161
Репутация: +  26  -
Профиль   Адрес электронной почты  

Сигнал прерывания с клавиатуры

Да, я согласен – блок нужен. Можно написать так…

def join(self, timeout=1.0):
while self.isAlive():
super(threading.Thread, self).join(timeout)
У меня сейчас не получается заняться такими проверками… Попробуешь?
Оно должно работать с обычным флагом.



Офлайн

#8 Июль 17, 2010 21:23:28

Alex2ndr
От:
Зарегистрирован: 2009-12-26
Сообщения: 204
Репутация: +  0  -
Профиль   Отправить e-mail  

Сигнал прерывания с клавиатуры

ZZZ
У меня сейчас не получается заняться такими проверками… Попробуешь?
Оно должно работать с обычным флагом.
Попробовал. Написал вот такой пример -
#!/usr/bin/env python
# vim: sw=4 ts=4 expandtab ai

import time
import threading
import signal

class Trtest(threading.Thread):

def __init__(self):
self.flagstop = False
super(Trtest, self).__init__()

def run(self):
print "start"
self.flagstop = False
while not self.flagstop:
print "run loop flag =", self.flagstop
time.sleep(1.0)

def join(self, timeout=1.0):
while self.isAlive():
super(Trtest, self).join(timeout)

def stop(self):
print "stop"
self.flagstop = True

t = Trtest()
signal.signal(signal.SIGINT, lambda *a: t.stop())
t.start()
t.join()
print "Done!"
Только join немного переделал. Вместо
super(threading.Thread, self).join(timeout)
написал
super(Trtest, self).join(timeout)
иначе при запуске выдает вот это -
 alex@kubu-book:~/development/PYTHON_projects$ python ./threadtest.py 
start
run loop
Traceback (most recent call last):
File "./threadtest.py", line 32, in <module>
t.join()
File "./threadtest.py", line 23, in join
super(threading.Thread, self).join(timeout)
AttributeError: 'super' object has no attribute 'join'
Exception AttributeError: "'super' object has no attribute 'join'" in <module 'threading' from '/usr/lib/python2.6/threading.pyc'> ignored
Вообщем результат нормальный - завершается как положено:
 alex@kubu-book:~/development/PYTHON_projects$ python ./threadtest.py
start
run loop flag = False
run loop flag = False
run loop flag = False
run loop flag = False
^Cstop
Done!
alex@kubu-book:~/development/PYTHON_projects$
Даже вот так протестировал:
for i in `seq 100`; do python ./threadtest.py & sleep $i; kill -2 %1; sleep 2.0;  done
Все пока нормально. Я правда не знаю что там с этим таймаутом. Не будет ли какого косяка от того что там идет ожидание. Но можно взять за рабочую версию.



Офлайн

#9 Июль 18, 2010 11:46:37

ZZZ
От: Москва
Зарегистрирован: 2008-04-03
Сообщения: 2161
Репутация: +  26  -
Профиль   Адрес электронной почты  

Сигнал прерывания с клавиатуры

Alex2ndr
Только join немного переделал.
Вот блин… Косячу… Сорри.

Alex2ndr
Даже вот так протестировал:
:-) Красиво!

Alex2ndr
Я правда не знаю что там с этим таймаутом. Не будет ли какого косяка от того что там идет ожидание.
Да вроде не должно. Давай пока так, это действительно правильнее, чем цеплять к rpc то, что ему не свойственно.



Офлайн

#10 Июль 18, 2010 21:32:46

Alex2ndr
От:
Зарегистрирован: 2009-12-26
Сообщения: 204
Репутация: +  0  -
Профиль   Отправить e-mail  

Сигнал прерывания с клавиатуры

Переделал в проекте назад на сигналы. Только на обычных флагах он что-то подвисает. Останавливается не с 1-го сигнала, а со 2-го или 3-го. И тормозит в остановке. Не знаю почему так. Сделал на threading.Event - вроде таких граблей нет. Я постараюсь сделать примерчик, чтобы понять где он там висит.



Офлайн

Board footer

Модераторировать

Powered by DjangoBB

Lo-Fi Version