Уведомления

Группа в Telegram: @pythonsu

#1 Янв. 3, 2011 13:06:10

alexlp
От:
Зарегистрирован: 2009-11-12
Сообщения: 40
Репутация: +  0  -
Профиль   Отправить e-mail  

rpyc + multiprocessing

У меня работает веб-сервер (pylons), который должен формировать отчеты для пользователей.
Отчеты довольно большие, поэтому суточный “дамп” я записываю в текстовый файл через знаки-разделители (для удобного поиска через регулярные выражения).

На сервере установлено 4 ЦПУ, к тому же еще 2-ядерных и поддержкой Hyperthreading.
Возникла потребность делать вызовы не непосредственно из веб-сервера, т.к. насколько я понимаю, все будет выполняться в пределах одного ЦПУ, а используя модуль multiprocessing на отдельном демоне. Обращение к демону через rpyc.

Иногда “отчет” срабатывает, иногда нет

Вот демон

from rpyc.utils.server import ThreadedServer
from multiprocessing import Process, Queue
numprocs = 15
filelist = os.listdir('/var/www/reports/dumpfiles')

def daemonize(stdin='/dev/null', stdout='/dev/null', stderr='/dev/null'):
# First fork
try:
pid = os.fork()
if pid > 0:
sys.exit(0)
except OSError, e:
sys.stderr.write('Fork #1 failed: (%d) %s\n' % (e.errno, e.strerror))
sys.exit(1)
os.chdir('/')
os.umask(0)
os.setsid()
# Second fork
try:
pid = os.fork()
if pid > 0:
sys.exit(0)
except OSError, e:
sys.stderr.write('Fork #2 failed: (%d) %s\n' % (e.errno, e.strerror))
sys.exit(1)
print “It's alive!\n”

for f in sys.stdout, sys.stderr: f.flush()
si = file(stdin, ‘r’)
so = file(stdout, ‘a+’)
se = file(stderr, ‘a+’, 0)
os.dup2(si.fileno(), sys.stdin.fileno())
os.dup2(so.fileno(), sys.stdout.fileno())
os.dup2(se.fileno(), sys.stderr.fileno())

t = ThreadedServer(ks2Service, port = 18861)
t.start()


def calculate(s_id, date_from, date_to):
# подсчет открывается файл из dumpfiles считается, файл закрывается
return result

class mywebService(rpyc.Service):
def exposed_rep1count(self, **qdic):
q = Queue()
outq = Queue()

tempList = qdic
date_from = datetime.datetime.strptime(qdic, “%Y.%m.%d”)
date_to = datetime.datetime.strptime(qdic, “%Y.%m.%d”)

# buildIndex(t_from, t_to)

def f(i,q):
while True:
if q.empty():
sys.exit()
serv = q.get()
outq.put(calculate(serv, date_from, date_to))

for item in tempList:
q.put(item)

processes =
for i in range(numprocs):
p = Process(target=f, args=)
p.start()
processes.append(p)

for p in processes:
p.join()

result =
while not outq.empty():
result.append(outq.get())

return pickle.dumps(result)

if __name__ == ‘__main__’:
daemonize(stdout='/var/log/mywebrpycserv.log', stderr='/var/log/mywebrpycserv.err')



Отредактировано (Янв. 3, 2011 13:06:54)

Офлайн

Board footer

Модераторировать

Powered by DjangoBB

Lo-Fi Version