Найти - Пользователи
Полная версия: Подвисает процесс блокируя queue.
Начало » Python для новичков » Подвисает процесс блокируя queue.
1
dregor
Никак не могу понять в чем причина и pdb юзал и strace-ом слушал процессы, если кто подскажет в чем проблема буду очень рад. Возможно сама идея скрипта и идиотская, но обсудить хотелось бы в чем именно проблема, а не идею, причина для создания такого кода есть, но обсуждать ее здесь нет причины. Если есть просто советы по улучшению кода тоже хорошо , python начал учить всего месяца полтора назад, пинать можно, я крепкий, выдержу Проблема описана чуть ниже перед вторым кусочком кода.

Класс дает общий интерфейс для работы с архивами :

class Arch():
    def __init__( self, name, path = None ):
        from os import path
        self.name = name
        if path == None:
            self.path = path.dirname( __file__ )
        else:
            self.path = path
    def name_list( self ):
        for item in self.arch_desc.infolist():
            if not item.filename.endswith('/'):
                yield item.filename
        #return self.arch_desc.namelist()
    def __str__(self):
        return self.arch_desc.name
    def extract( self, file_name ):
           self.arch_desc.extract( file_name )
class ZipArch( Arch ):
    def __init__( self, name, path = None ):
        Arch.__init__( self, name , path )
        self.open()
    def open( self ):
        from zipfile import ZipFile
        self.arch_desc = ZipFile( self.name, 'r' )
    def __str__( self ):
        return self.arch_desc.filename
class RarArch( Arch ):
    def __init__( self, name, path = None ):
        Arch.__init__( self, name , path )
        self.open()
    def open( self ):
        from rarfile import RarFile
        self.arch_desc = RarFile( self.name, 'r' )
    def name_list( self ):
        for item in self.arch_desc.infolist():
            if not item.isdir():
                yield item.filename
class TarArch( Arch ):
    def __init__( self, name, path = None ):
        Arch.__init__( self, name , path )
        self.open()
    def open( self ):
        import tarfile
        if self.name[-7:] == '.tar.gz' : readstate = 'r:gz'
        elif self.name[-8:] == '.tar.bz2' :  readstate = 'r:bz2'
        else : readstate = 'r:tar'
        self.arch_desc = tarfile.open( self.name , readstate )
    def name_list( self ):
        return [ i.name for i in self.arch_desc.getmembers() if not i.isdir() ]

Основной скрипт, запускает список процессов которые берут из очереди имена файлов архива и распаковывают их. Есть несколько проблем, основная : иногда один из процессов подвисает на блокировке очереди и все стопорится пока его не убьешь. Так же не пойму как можно было бы реализовать возвращение в очередь имени файла если процесс который его распаковывал был killed .

#! /usr/bin/env python3
# -*- coding: utf-8 -*-
from multiprocessing import Process
import logging as Log
Log.basicConfig( level = Log.DEBUG,format = '%(asctime)s - %(name)s - %(levelname)s - %(process)d - %(processName)12s - %(message)s' )
class ExedProc( Process ):
    def __init__( self, queue, arch):
        Process.__init__( self )
        self.queue = queue
        self.arch = arch
        self.log = Log.getLogger('SubProc')
    def run( self ):
        import os
        while not self.queue.empty():
            file = self.queue.get( )
            self.arch.extract( file )
            self.log.info('Извлекается файл'+'  -  [' + file + ']')
        os._exit(0)
class Exed():
    def __init__( self, name, path = None, proc = None):
        from multiprocessing import Queue, cpu_count
        from executor import TarArch, RarArch, ZipArch
        #Логирование 
        self.log = Log.getLogger('main')
        self.log.info('Запуск распаковки %s' % (name) )
        #Тип архива
        if name[-8:] == '.tar.bz2' or name[-4:] == '.tar' or name[-7:] == '.tar.gz':
            self.arch = TarArch( name )
        elif name[-4:] == '.zip':
            self.arch =ZipArch( name )
        elif name[-4:] == '.rar':
            self.arch = RarArch( name )
        self.log.info('Тип архива : %s' % self.arch.__class__.__name__.__str__() )
        #Очередь
        self.queue = Queue()
        for item in self.arch.name_list():
            self.queue.put( item )
        #Лист процессов
        if proc == None:
            self.proc = cpu_count()
        else:
            self.proc = proc
        #self.proc = 1
        self.proc_list = [ ExedProc( self.queue, self.arch ) for item in range( self.proc )]
    def stop( self ):
        self.runing = False
        for item in self.proc_list:
            item.terminate()
    def start( self):
        self.runing = True
        for item in self.proc_list:
            item.start()
            print(item)
#Проверят жив ли хотя бы один из процессов
def is_alive( list ):
    for i in list:
        if i.is_alive():return True
    return False
if __name__ == '__main__':
    import os, sys, time
    test = Exed( os.path.dirname( __file__ ) + sys.argv[1] )
    test.start()
    while is_alive( test.proc_list ):
        try:
            time.sleep(1)
        except KeyboardInterrupt:
            test.stop()
            test.log.warn( 'Остановлено' )
lorien
Сделайте чтение из очереди с таймаутом например на 20 сек т.е. поймайте состояние, когда наступает блокировка и изучите состояние системы в except блоке
dregor
Проблема как раз в том, что никаких ошибок не возникает, да и если бы возникали то трассировка бы мне больше пригодилась, чем экзепшн.

Проблема выглядит вот так :

1639 root 20 0 77416 16m 2392 S 0 1.1 0:01.19 python3 exed.py dir.tar
1657 root 20 0 86312 16m 1576 S 0 1.1 0:00.36 python3 exed.py dir.tar
1658 root 20 0 0 0 0 Z 0 0.0 0:00.70 <defunct>
1659 root 20 0 0 0 0 Z 0 0.0 0:00.38 <defunct>
1660 root 20 0 86312 16m 1576 S 0 1.1 0:00.30 python3 exed.py dir.tar
1661 root 20 0 0 0 0 Z 0 0.0 0:00.40 <defunct>
1662 root 20 0 0 0 0 Z 0 0.0 0:00.73 <defunct>
1663 root 20 0 0 0 0 Z 0 0.0 0:00.34 <defunct>
1664 root 20 0 0 0 0 Z 0 0.0 0:00.33 <defunct>

strace -p 1639 (основной процесс просто ждет KeyboardInterrupt и если получает то terminate процессы. )
Process 1639 attached
select(0, NULL, NULL, NULL, {0, 95291}) = 0 (Timeout)
wait4(1657, 0x7fff4da8f654, WNOHANG, NULL) = 0
select(0, NULL, NULL, NULL, {1, 0}) = 0 (Timeout)
wait4(1657, 0x7fff4da8f654, WNOHANG, NULL) = 0

strace -p 1657 ( вероятно проблема здесь, ожидает ввода, но ума не прилажу с чего вдруг. )
Process 1657 attached
read(4,

strace -p 1660 ( а вот тут futex и есть, ждет пока разблокируется )
Process 1660 attached
futex(0x7fb02353f000, FUTEX_WAIT, 0, NULL


This is a "lo-fi" version of our main content. To view the full version with more information, formatting and images, please click here.
Powered by DjangoBB