Уведомления

Группа в Telegram: @pythonsu

#1 Апрель 15, 2014 21:49:35

nemereno
Зарегистрирован: 2014-04-15
Сообщения: 2
Репутация: +  0  -
Профиль   Отправить e-mail  

Добавить очередь в multiprocessing socket workers

Привет всем!
Есть такая задача: принимать соединения по сокетам, получать входные данные для сложных вычислений, вычислять и отдавать ответ. Запросов одновременно может быть очень много.
Сначала я понял, что из-за GIL я не смогу использовать нормально потоки, и пробовал обернуть всё в C++, подключая boost:threads и boost:python, запуская потоки в C++ и производя вычисления в каждом потоке в субинтерпретаторе python, но нифига не вышло, всёравно проц с 8 ядрами не юзался на 100%.

В итоге я решил, что придётся только через multiprocessing, и оптимально будет создать сразу n worker'ов (кол-во ядер*2), чтобы они сидели и ждали заданий. Таким образом не тратится время на создание процесса, и они не расплодятся более чем n количество.

С сокетами и потоками я больше работал в C++, поэтому в python'е только сталкиваюсь с этим.

Есть такой скрипт:

#!/usr/bin/env python
import multiprocessing as mp
import logging
import socket
import time
 
logger = mp.log_to_stderr(logging.DEBUG)
 
def worker(socket):
    while True:
        client, address = socket.accept()
        logger.debug("{u} connected".format(u=address))
        #hard and long calculations
        client.send("OK")
        client.close()
 
 
if __name__ == '__main__':
    num_workers = 5
 
    serversocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    serversocket.bind(('',1111))
    serversocket.listen(5)
 
    workers = [mp.Process(target=worker, args=(serversocket,)) for i in
            range(num_workers)]
 
    for p in workers:
        p.daemon = True
        p.start()
 
    while True:
        try:
            time.sleep(10)
        except:
            break

Сейчас он создаёт 5 воркеров, но больше 5 он соединений не примет.
Нужно переделать его, чтобы создавалась очередь из этих соединений, т.е. по сути worker выполнил вычисления и закрыл соединение, освободившись, и взял следующий сокет.

Знатокам потребуется добавить сюда пару строчек, только вот каких… прошу помочь

Офлайн

#2 Апрель 16, 2014 14:35:41

nemereno
Зарегистрирован: 2014-04-15
Сообщения: 2
Репутация: +  0  -
Профиль   Отправить e-mail  

Добавить очередь в multiprocessing socket workers

Ок, я нашёл что-то похожее на то, что мне нужно. Но оно не работает.
После подключения телнетом, соединение сразу закрывается. Почему?

#!/usr/bin/env python   
import os
import sys
import SocketServer
import Queue
import time
import socket
import multiprocessing
from multiprocessing.reduction import reduce_handle
from multiprocessing.reduction import rebuild_handle 
class MultiprocessWorker(multiprocessing.Process):
    def __init__(self, sq):
        self.SLEEP_INTERVAL = 1
        # base class initialization
        multiprocessing.Process.__init__(self)
        # job management stuff
        self.socket_queue = sq
        self.kill_received = False
    def run(self):
        while not self.kill_received:
            try:     
                h = self.socket_queue.get_nowait()          
                fd=rebuild_handle(h)
                client_socket=socket.fromfd(fd,socket.AF_INET,socket.SOCK_STREAM)
                #client_socket.send("hellofromtheworkerprocess\r\n")
                received = client_socket.recv(1024)
                print "Recieved on client: ",received
                client_socket.close()
            except Queue.Empty:
                pass
            #Dummy timer
            time.sleep(self.SLEEP_INTERVAL)
class MyTCPHandler(SocketServer.BaseRequestHandler):
    """
    The RequestHandler class for our server.
    It is instantiated once per connection to the server, and must
    override the handle() method to implement communication to the
    client.
    """
    def handle(self):
        # self.request is the TCP socket connected to the client
        #self.data = self.request.recv(1024).strip()
        #print "{} wrote:".format(self.client_address[0])
        #print self.data
        # just send back the same data, but upper-cased
        #self.request.sendall(self.data.upper())
        #Either pipe it to worker directly like this
        #pipe_to_worker.send(h) #instanceofmultiprocessing.Pipe
        #or use a Queue :)
        h = reduce_handle(self.request.fileno())
        socket_queue.put(h)
if __name__ == "__main__":
    #Mainprocess
    address =  ('localhost', 8082)
    server = SocketServer.TCPServer(address, MyTCPHandler)
    socket_queue = multiprocessing.Queue()
    for i in range(5):
        worker = MultiprocessWorker(socket_queue)
        worker.start()
    try:
        server.serve_forever()
    except KeyboardInterrupt:
        sys.exit(0)

Офлайн

#3 Апрель 16, 2014 14:49:10

plusplus
От:
Зарегистрирован: 2009-01-05
Сообщения: 418
Репутация: +  15  -
Профиль   Отправить e-mail  

Добавить очередь в multiprocessing socket workers

Я рекоменудю посмотреть twisted. Асинхронный фреймворк, tcp-сервер на нем пишется в пару строчек. Есть модули для работы с процессами, что позволит загрузить все ядра.



Офлайн

Board footer

Модераторировать

Powered by DjangoBB

Lo-Fi Version