Найти - Пользователи
Полная версия: Многопоточный TCP сервер, база данных и очереди
Начало » Network » Многопоточный TCP сервер, база данных и очереди
1
battlecoder
Всем привет.

Есть сервер. Сначала думал над тем, как вручную реализовать, потом наткнулся на http://docs.python.org/py3k/library/socketserver.html и решил, что это то, что мне надо.

Собственно, вся реализация сервера:

class ThreadedTCPServer(socketserver.ThreadingMixIn, socketserver.TCPServer):
    pass

Обрабатывает все запросы некий класс

class ThreadedTCPRequestHandler(socketserver.BaseRequestHandler):
    def handle(self):
        data = str(self.request.recv(1024), 'ascii')
        cur_thread = threading.current_thread()
        response = bytes("{}: {}".format(cur_thread.name, data), 'ascii')
        self.request.sendall(response)

Всё работает как надо (если скопировать пример из дока). Нюанс вот в чём. Мне нужно чтобы сервер обслуживал многих клиентов, а они могли выполнять какие-то операции. Например, пользователь хочет зарегистрироваться - для этого клиент посылает специальный запрос, сервер его обрабатывает, и он должен добавить запись в базу данных (использую sqlite).

Но sqlite нельзя использовать из множества потоков!! Решить проблему можно так - выбрать один поток, который и будет писать в базу данных и выполнять все операции. Можно, например взять очередь http://docs.python.org/py3k/library/queue

сделал примерно так, создаю очередь и делаю её полем класса:

tasks = queue.Queue()
ThreadedTCPRequestHandler.tasks = tasks

Также передаю в специальный класс обработчик, которую запускаю в отдельном потоке (и этот поток всегда одинаковый, запускаю его ОДИН раз)

worker = Worker(tasks)

Ну реализация примерно такая

class ThreadedTCPRequestHandler(socketserver.BaseRequestHandler):
    def handle(self):
        data = str(self.request.recv(1024), 'ascii')
        ThreadedTCPRequestHandler.tasks.put(data)
        # что посылать в ответ??
        self.request.sendall(???)
class Worker:
    def __init__(self, tasks):
        self.tasks = tasks
    def do(self):
        while True:
            item = q.get()
            # делаю тут что-то, например, работаю с базой данных sqlite
            q.task_done()

Собственно, мне нужно узнать результат выполнения, выполнена ли команда успешно или вообще не выполнена… и вернуть результат клиент (через sock.sendall(b'что-то'). Как это сделать? %) мне создать ещё одну очередь? такой вариант наверное не прокатит…

Я пробовал просто отправлять в очередь tasks = queue.Queue() экземпляры сокетов (и делать sock.recv/sock.send уже из класса Worker) - но такой вариант вообще не работает, сокет становится закрытым… то есть с сокетом можно получается работать только из того потока, который обслуживает клиента.

Что делать? %)
mindless
можно сделать каждому потоку очередь в которую worker будет скидывать результат
def handle(self):
    data = str(self.request.recv(1024), 'ascii')
    myname = threading.current_thread().name
    ThreadedTCPRequestHandler.tasks.put((myname, data))
    result = ThreadedTCPRequestHandler.results[myname].get()
    self.request.sendall(result)
while True:
    name, item = q.get()
    # делаю тут что-то, например, работаю с базой данных sqlite
    ThreadedTCPRequestHandler.results[name].put(smth)
    q.task_done()

upd: почему подсветка по умолчанию не python?
upd2: если писать code] вручную
odnochlen
mindless
upd: почему подсветка по умолчанию не python?
У меня питон, ЧЯДНТ?
Lexander
Если вы хотите писать сами низкоуровневые вещи, рекомендую изучить хотя бы потоки.
Вот есть хороший пример сервера, который создает пул потоков-обработчиков для каждого клиента:
http://keysolutions.ru/articles/osnovy-raboty-s-potokami-v-python
battlecoder
Мм, спасибо за ссылку. Может подумаю о том, чтобы создать пул потоков.

С этим вроде бы разобрался (без пула), сделал так, чтобы всё отправлялось через один и тот же сокет, и через него же принималось.
Вкратце - есть поток, который прослушивает сокет, и если пришло сообщение - парсит его. Если это notification, кладёт его в одну очередь, а если response - в другую.
И другой поток (по сути главный поток) является клиентом, он может отправлять запросы (request) в этот же самый socket, но response (ответы) он оттуда не берёт, а вместо этого запрашивает их из очереди (в которую их кладёт первый поток).

Довольно гибкая система, хотя код IMHO всё равно получился ужасен не привык я пока к питону. Но работает. Как надо.

Осталось с GUI нормальным разобраться ну и с прочими тонкостями этого приложения. Но это уже совсем другая история.
altRUist
battlecoder
Нюанс вот в чём. Мне нужно чтобы сервер обслуживал многих клиентов, а они могли выполнять какие-то операции. Например, пользователь хочет зарегистрироваться - для этого клиент посылает специальный запрос, сервер его обрабатывает, и он должен добавить запись в базу данных (использую sqlite).Но sqlite нельзя использовать из множества потоков!! Решить проблему можно так - выбрать один поток, который и будет писать в базу данных и выполнять все операции.
я, конечно, сам “со вчера” изучаю Python, но, мне кажется, если мультипотоковое приложение отправляет все потоки работы с базой данных к одному потоку, то какой смысл тогда этой мультипоточности?
скажите а как это sqlite не работает в потоках?
есть ли какой-то аналог perl-овского DBI?

может кто-нибудь натыкался на пример приложения, которое использует os.fork(), SocketServer, threading, и работает с базами данных mysql (извините за мою наглость ))
Soteric
Если запрос к базе занимает малую часть от общего времени обработки сообщения, то преимущество все равно будет. Да, это станет одним из узких мест в системе, но вероятно не самым тормозящим.
This is a "lo-fi" version of our main content. To view the full version with more information, formatting and images, please click here.
Powered by DjangoBB