Форум сайта python.su
0
Привет всем!
Есть такая задача: принимать соединения по сокетам, получать входные данные для сложных вычислений, вычислять и отдавать ответ. Запросов одновременно может быть очень много.
Сначала я понял, что из-за GIL я не смогу использовать нормально потоки, и пробовал обернуть всё в C++, подключая boost:threads и boost:python, запуская потоки в C++ и производя вычисления в каждом потоке в субинтерпретаторе python, но нифига не вышло, всёравно проц с 8 ядрами не юзался на 100%.
В итоге я решил, что придётся только через multiprocessing, и оптимально будет создать сразу n worker'ов (кол-во ядер*2), чтобы они сидели и ждали заданий. Таким образом не тратится время на создание процесса, и они не расплодятся более чем n количество.
С сокетами и потоками я больше работал в C++, поэтому в python'е только сталкиваюсь с этим.
Есть такой скрипт:
#!/usr/bin/env python import multiprocessing as mp import logging import socket import time logger = mp.log_to_stderr(logging.DEBUG) def worker(socket): while True: client, address = socket.accept() logger.debug("{u} connected".format(u=address)) #hard and long calculations client.send("OK") client.close() if __name__ == '__main__': num_workers = 5 serversocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) serversocket.bind(('',1111)) serversocket.listen(5) workers = [mp.Process(target=worker, args=(serversocket,)) for i in range(num_workers)] for p in workers: p.daemon = True p.start() while True: try: time.sleep(10) except: break
Офлайн
0
Ок, я нашёл что-то похожее на то, что мне нужно. Но оно не работает.
После подключения телнетом, соединение сразу закрывается. Почему?
#!/usr/bin/env python import os import sys import SocketServer import Queue import time import socket import multiprocessing from multiprocessing.reduction import reduce_handle from multiprocessing.reduction import rebuild_handle class MultiprocessWorker(multiprocessing.Process): def __init__(self, sq): self.SLEEP_INTERVAL = 1 # base class initialization multiprocessing.Process.__init__(self) # job management stuff self.socket_queue = sq self.kill_received = False def run(self): while not self.kill_received: try: h = self.socket_queue.get_nowait() fd=rebuild_handle(h) client_socket=socket.fromfd(fd,socket.AF_INET,socket.SOCK_STREAM) #client_socket.send("hellofromtheworkerprocess\r\n") received = client_socket.recv(1024) print "Recieved on client: ",received client_socket.close() except Queue.Empty: pass #Dummy timer time.sleep(self.SLEEP_INTERVAL) class MyTCPHandler(SocketServer.BaseRequestHandler): """ The RequestHandler class for our server. It is instantiated once per connection to the server, and must override the handle() method to implement communication to the client. """ def handle(self): # self.request is the TCP socket connected to the client #self.data = self.request.recv(1024).strip() #print "{} wrote:".format(self.client_address[0]) #print self.data # just send back the same data, but upper-cased #self.request.sendall(self.data.upper()) #Either pipe it to worker directly like this #pipe_to_worker.send(h) #instanceofmultiprocessing.Pipe #or use a Queue :) h = reduce_handle(self.request.fileno()) socket_queue.put(h) if __name__ == "__main__": #Mainprocess address = ('localhost', 8082) server = SocketServer.TCPServer(address, MyTCPHandler) socket_queue = multiprocessing.Queue() for i in range(5): worker = MultiprocessWorker(socket_queue) worker.start() try: server.serve_forever() except KeyboardInterrupt: sys.exit(0)
Офлайн
15
Офлайн