Уведомления

Группа в Telegram: @pythonsu
  • Начало
  • » Network
  • » Dns proxy serever с поддержкой "черного" списка доменных имен [RSS Feed]

#1 Ноя. 24, 2016 03:20:15

PooH
От:
Зарегистрирован: 2006-12-05
Сообщения: 1948
Репутация: +  72  -
Профиль   Отправить e-mail  

Dns proxy serever с поддержкой "черного" списка доменных имен

Бери ношу по себе, чтоб не падать при ходьбе.



Вот здесь один из первых отарков съел лаборанта. Это был такой умный отарк, что понимал даже теорию относительности. Он разговаривал с лаборантом, а потом бросился на него и загрыз…

Офлайн

#2 Ноя. 24, 2016 10:07:22

Tsarneba
Зарегистрирован: 2016-11-22
Сообщения: 8
Репутация: +  0  -
Профиль   Отправить e-mail  

Dns proxy serever с поддержкой "черного" списка доменных имен

Проверьте пожалуйста правильность кода. Подходит ли он под мои задачи?

1.Для параметров используется конфигурационный файл, считывающийся при запуске сервера;
2.“Черный” список доменных имен находится в конфигурационном файле;
3.Адрес вышестоящего сервера также находится в конфигурационном файле;
4.Сервер принимает запросы DNS-клиентов на стандартном порту;
5.Если запрос содержит доменное имя, включенное в “черный” список, сервер возвращает клиенту ответ, заданный конфигурационным файлом (варианты: not resolved, адрес в локальной сети, …).
6.Если запрос содержит доменное имя, не входящее в “черный” список, сервер перенаправляет запрос вышестоящему серверу, дожидается ответа и возвращает его клиенту

 import datetime
import sys
import time
import threading
import traceback
import SocketServer
import dns.resolver
 
bls = ["zen.spamhaus.org", "spam.abuse.ch", "cbl.abuseat.org", "virbl.dnsbl.bit.nl", "dnsbl.inps.de", 
    "ix.dnsbl.manitu.net", "dnsbl.sorbs.net", "bl.spamcannibal.org", "bl.spamcop.net", 
    "xbl.spamhaus.org", "pbl.spamhaus.org", "dnsbl-1.uceprotect.net", "dnsbl-2.uceprotect.net", 
    "dnsbl-3.uceprotect.net", "db.wpbl.info"]
 
if len(sys.argv) != 2:
    print 'Usage: %s <ip>' %(sys.argv[0])
    quit()
 
myIP = sys.argv[1]
 
for bl in bls:
    try:
        my_resolver = dns.resolver.Resolver()
        query = '.'.join(reversed(str(myIP).split("."))) + "." + bl
        answers = my_resolver.query(query, "A")
        answer_txt = my_resolver.query(query, "TXT")
        print 'IP: %s IS listed in %s (%s: %s)' %(myIP, bl, answers[0], answer_txt[0])
    except dns.resolver.NXDOMAIN:
        print 'IP: %s NOT resolved %s' %(myIP, bl)
        raise RuntimeError("Something bad happened")
        
from dnslib import *
class DomainName(str):
    def __getattr__(self, item):
        return DomainName(item + '.' + self)
D = DomainName('example.com')
IP = '127.0.0.1'
TTL = 60 * 5
PORT = 5053
soa_record = SOA(
    mname=D.ns1,  # primary name server
    rname=D.andrei,  # email of the domain administrator
    times=(
        201307231,  # serial number
        60 * 60 * 1,  # refresh
        60 * 60 * 3,  # retry
        60 * 60 * 24,  # expire
        60 * 60 * 1,  # minimum
    )
)
ns_records = [NS(D.ns1), NS(D.ns2)]
records = {
    D: [A(IP), AAAA((0,) * 16), MX(D.mail), soa_record] + ns_records,
    D.ns1: [A(IP)],  # MX and NS records must never point to a CNAME alias (RFC 2181 section 10.3)
    D.ns2: [A(IP)],
    D.mail: [A(IP)],
    D.andrei: [CNAME(D)],
}
def dns_response(data):
    request = DNSRecord.parse(data)
    print request
    reply = DNSRecord(DNSHeader(id=request.header.id, qr=1, aa=1, ra=1), q=request.q)
    qname = request.q.qname
    qn = str(qname)
    qtype = request.q.qtype
    qt = QTYPE[qtype]
    if qn == D or qn.endswith('.' + D):
        for name, rrs in records.iteritems():
            if name == qn:
                for rdata in rrs:
                    rqt = rdata.__class__.__name__
                    if qt in ['*', rqt]:
                        reply.add_answer(RR(rname=qname, rtype=QTYPE[rqt], rclass=1, ttl=TTL, rdata=rdata))
        for rdata in ns_records:
            reply.add_ns(RR(rname=D, rtype=QTYPE.NS, rclass=1, ttl=TTL, rdata=rdata))
        reply.add_ns(RR(rname=D, rtype=QTYPE.SOA, rclass=1, ttl=TTL, rdata=soa_record))
    print "---- Reply:\n", reply
    return reply.pack()
class BaseRequestHandler(SocketServer.BaseRequestHandler):
    def get_data(self):
        raise NotImplementedError
    def send_data(self, data):
        raise NotImplementedError
    def handle(self):
        now = datetime.datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S.%f')
        print "\n\n%s request %s (%s %s):" % (self.__class__.__name__[:3], now, self.client_address[0],
                                               self.client_address[1])
        try:
            data = self.get_data()
            print len(data), data.encode('hex')  # repr(data).replace('\\x', '')[1:-1]
            self.send_data(dns_response(data))
        except Exception:
            traceback.print_exc(file=sys.stderr)
class TCPRequestHandler(BaseRequestHandler):
    def get_data(self):
        data = self.request.recv(8192).strip()
        sz = int(data[:2].encode('hex'), 16)
        if sz < len(data) - 2:
            raise Exception("Wrong size of TCP packet")
        elif sz > len(data) - 2:
            raise Exception("Too big TCP packet")
        return data[2:]
    def send_data(self, data):
        sz = hex(len(data))[2:].zfill(4).decode('hex')
        return self.request.sendall(sz + data)
class UDPRequestHandler(BaseRequestHandler):
    def get_data(self):
        return self.request[0].strip()
    def send_data(self, data):
        return self.request[1].sendto(data, self.client_address)
if __name__ == '__main__':
    print "Starting nameserver..."
    servers = [
        SocketServer.ThreadingUDPServer(('', PORT), UDPRequestHandler),
        SocketServer.ThreadingTCPServer(('', PORT), TCPRequestHandler),
    ]
    for s in servers:
        thread = threading.Thread(target=s.serve_forever)  # that thread will start one more thread for each request
        thread.daemon = True  # exit the server thread when the main thread terminates
        thread.start()
        print "%s server loop running in thread: %s" % (s.RequestHandlerClass.__name__[:3], thread.name)
    try:
        while 1:
            time.sleep(1)
            sys.stderr.flush()
            sys.stdout.flush()
    except KeyboardInterrupt:
        pass
    finally:
        for s in servers:
            s.shutdown()

Прикреплённый файлы:
attachment simple2.py (5,0 KБ)

Офлайн

#3 Ноя. 24, 2016 10:19:11

Tsarneba
Зарегистрирован: 2016-11-22
Сообщения: 8
Репутация: +  0  -
Профиль   Отправить e-mail  

Dns proxy serever с поддержкой "черного" списка доменных имен

PooH
Бери ношу по себе, чтоб не падать при ходьбе.
Я это прекрасно понимаю Хочется турбо старт себе устроить) по этому обратился за помощью.
Спасибо.

Офлайн

#4 Ноя. 24, 2016 10:41:38

FishHook
От:
Зарегистрирован: 2011-01-08
Сообщения: 8312
Репутация: +  568  -
Профиль   Отправить e-mail  

Dns proxy serever с поддержкой "черного" списка доменных имен

   
class DomainName(str):
    def __getattr__(self, item):
        return DomainName(item + '.' + self)

Какая-то сомнительной полезности штука. Адрес может с цифры начинаться типа 3d.com, на этом ваш класс упадет
 c = DomainName("google.ru")
print c.3d  # EXCEPTION!



Офлайн

#5 Ноя. 24, 2016 15:39:47

Tsarneba
Зарегистрирован: 2016-11-22
Сообщения: 8
Репутация: +  0  -
Профиль   Отправить e-mail  

Dns proxy serever с поддержкой "черного" списка доменных имен

FishHook
А касательно остального?все нормально?

Отредактировано Tsarneba (Ноя. 24, 2016 15:40:17)

Офлайн

  • Начало
  • » Network
  • » Dns proxy serever с поддержкой "черного" списка доменных имен[RSS Feed]

Board footer

Модераторировать

Powered by DjangoBB

Lo-Fi Version