Найти - Пользователи
Полная версия: Ретрансляция udp multicast в http на Twisted
Начало » Network » Ретрансляция udp multicast в http на Twisted
1 2
init
Необходимо слушать порт и принимать с мультикаста udp-пакеты, которые по запросу отдавать по http. Есть что-то вроде этого:

class MyRequestHandler(http.Request):
    def process(self):
        #здесь отдаем пакеты клиенту
class MyHttp(http.HTTPChannel):
    requestFactory = MyRequestHandler
class MyHttpFactory(http.HTTPFactory):
    protocol = MyHttp
reactor.listenTCP(8080, MyHttpFactory())
class MulticastServerUDP(DatagramProtocol):
    def startProtocol(self):
        self.transport.joinGroup('229.254.1.11')
    def datagramReceived(self, data, address):
        # здесь получаем пакеты
reactor.listenMulticast(9000, MulticastServerUDP())
reactor.run()
Необходимо сделать так, чтобы при запуске process() он каким-то образом сигнализировал о своем появлении datagramReceived() и последний при получении пакета отдавал его в self.write у process(). То есть при каждом вызове datagramReceived во все активные http-соединения писался бы полученный пакет. Как это сделать?
Читаю про Producer/Consumer, Deferred, но так и не понял что мне здесь нужно.
Андрей Светлов
непонятно. У тебя персистентные http соединения? Если нет - соединение закроется сразу же после отдавания пакета. И http connection обычно управляется со стороны клиента, т.е. отдает что попросили без излишней самодеятельности. В AJAX есть способы обойти, но они зависят от используемой библиотеки.

Что-то, кажется, не совсем “чисто” в архитектуре. Либо приведенный пример слишком мало ее раскрывает.
init
Соединение открывается и по нему непрерывно отдаются данные. Я так понял нужно на каждый вызов datagramReceived вызывать MyRequestHandler.write(data), так соединение не закрывается. Проблема не в отдаче - это все нормально работает, а как связать получение пакета по udp и отдачу его в write()
Андрей Светлов
экземпляр MyHttpFactory - глобальный.
В HTTPChannel (который на самом деле протокол) на allContentReceived/connectionLost регистрируешь/прибиваешь протокол в своем списке протоколов для фабрики. allContentReceived - потому что на этот момент уже все передано и настроено.
В datagramReceived проходишься по всем протоколам в фабрике и каждому говоришь proto.transport.write(…).

Повторяю: метод кривоват. Не уверен, что нормально будет работать со всеми браузерами. Конечно же, тебе он нужен не для браузера - но тогда зачем http. Еще сильно не уверен в том, что такие http будут нормально проходить через proxy.
init
Вообще клиент не браузер, а видео-плеер, работа через прокси не нужна. Попробовал сделать вот так, вроде как работает.

class MyRequestHandler(http.Request):
    def out(self, data):
        #этот callback вызывается на каждую datagramReceived, в нем же в deffered добавляем снова себя
        self.write(data)
        if self.transport.connected:
            d = defer.Deferred()
            d.addCallback(self.out)
            self.mserver.deferreds.append(d)
        else:
            self.finish()
    def process(self):
        #соединение установлено, создаем первый callback
        d = defer.Deferred()
        d.addCallback(self.out)
        self.mserver.deferreds.append(d)
 
class MyHttp(http.HTTPChannel):
    requestFactory = MyRequestHandler
 
class MyHttpFactory(http.HTTPFactory):
    protocol = MyHttp
  
class MulticastServerUDP(DatagramProtocol):
    def __init__(self):
        #deffereds - это список объектов Deffered, в каждый из которых добавляются колбэки при подключении новых клиентов по http
        self.deffereds = []
    def startProtocol(self):
        self.transport.joinGroup('229.254.1.11')
 
    def datagramReceived(self, data, address):
        # получаем пакет и посылаем его по всем затребовавшим callback'ам
        deferreds = copy(self.deferreds)
        self.deferreds = []
        for d in deferreds:
            d.callback(data)
mcast = MulticastServerUDP()
reactor.listenMulticast(9000, mcast)
mhttp = MyHttpFactory()
mhttp.mserver = mcast
reactor.listenTCP(8080, mhttp)
reactor.run()

Оцените кривость решения?
init
Что-то я перечитал еще раз ваш совет и подумал, что явно перемудрил с Deferred. Переписал так:

class MyRequestHandler(http.Request):
    def process(self):
        #добавляем себя в список к MulticastServerUDP
        self.transport.protocol.factory.mserver.outputs.append(self)
 
class MyHttp(http.HTTPChannel):
    requestFactory = MyRequestHandler
 
class MyHttpFactory(http.HTTPFactory):
    protocol = MyHttp
  
class MulticastServerUDP(DatagramProtocol):
    def __init__(self):
        #outputs - список экземпляров http.Request, в которые нужно писать получаемые данные
        self.outputs = []
 
    def startProtocol(self):
        self.transport.joinGroup('229.254.1.11')
 
    def datagramReceived(self, data, address):
        # получаем пакет и посылаем его по всем затребовавшим
        for out in outputs:
            out.write(data)
 
mcast = MulticastServerUDP()
reactor.listenMulticast(9000, mcast)
 
mhttp = MyHttpFactory()
mhttp.mserver = mcast
reactor.listenTCP(8080, mhttp)
 
reactor.run()
Андрей Светлов
Второй пример гораздо лучше. Deferred следует использовать правильно и только там, где действительно необходимо. Но все же стоит вставить удаление из MulticastServerUDP.outputs завершившегося канала. Причина завершения может быть самой разной, начиная с тривиального обрыва соединения.
init
Удаление сделал, просто забыл его скопировать:

class MyRequestHandler(http.Request):
def connectionLost(self, reason):
self.transport.protocol.factory.mserver. outputs.remove(self)
Только вот что странно, через сутки работы это приложение распухает до 1гб в памяти. Вроде все объекты удаляются, никак не пойму что там копится..
Андрей Светлов
Смотреть через модуль gc.
Для начала можно форсировано вызывать gc.collect() (например, каждую минуту).
init
Не помогает. collect() возвращает всегда одно и то же число 17, память продолжает съедаться. В gc.garbage пусто.
This is a "lo-fi" version of our main content. To view the full version with more information, formatting and images, please click here.
Powered by DjangoBB